From 134fe81efb19bca0ec56fbe8bae3eca10b5e5287 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Mon, 31 Jul 2023 15:36:03 +0100 Subject: [PATCH] feat(pruning): prune ChangeSets & History during pipeline (#3728) --- bin/reth/src/node/mod.rs | 7 +- bin/reth/src/stage/dump/merkle.rs | 25 ++- bin/reth/src/stage/run.rs | 22 ++- crates/primitives/src/lib.rs | 2 +- crates/primitives/src/prune/mod.rs | 2 +- crates/primitives/src/prune/part.rs | 12 +- crates/primitives/src/prune/target.rs | 40 ++-- crates/prune/src/error.rs | 3 + crates/prune/src/pruner.rs | 4 +- crates/stages/benches/criterion.rs | 4 +- crates/stages/src/error.rs | 3 + crates/stages/src/stages/execution.rs | 93 +-------- crates/stages/src/stages/hashing_account.rs | 26 ++- crates/stages/src/stages/hashing_storage.rs | 33 +++- .../src/stages/index_account_history.rs | 30 ++- .../src/stages/index_storage_history.rs | 30 ++- crates/stages/src/stages/merkle.rs | 56 ++++-- crates/stages/src/stages/mod.rs | 185 ++++++++++++++++++ crates/storage/provider/src/post_state/mod.rs | 21 +- 19 files changed, 434 insertions(+), 164 deletions(-) diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 98fa3b1130..ab6bcb623e 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -760,12 +760,17 @@ impl Command { .set(AccountHashingStage::new( stage_config.account_hashing.clean_threshold, stage_config.account_hashing.commit_threshold, + config.prune.map(|prune| prune.parts).unwrap_or_default(), )) .set(StorageHashingStage::new( stage_config.storage_hashing.clean_threshold, stage_config.storage_hashing.commit_threshold, + config.prune.map(|prune| prune.parts).unwrap_or_default(), + )) + .set(MerkleStage::new_execution( + stage_config.merkle.clean_threshold, + config.prune.map(|prune| prune.parts).unwrap_or_default(), )) - .set(MerkleStage::new_execution(stage_config.merkle.clean_threshold)) .set(TransactionLookupStage::new(stage_config.transaction_lookup.commit_threshold)) .set(IndexAccountHistoryStage::new( stage_config.index_account_history.commit_threshold, diff --git a/bin/reth/src/stage/dump/merkle.rs b/bin/reth/src/stage/dump/merkle.rs index 69b39234bd..b541687687 100644 --- a/bin/reth/src/stage/dump/merkle.rs +++ b/bin/reth/src/stage/dump/merkle.rs @@ -86,14 +86,22 @@ async fn unwind_and_copy( // Bring hashes to TO - AccountHashingStage { clean_threshold: u64::MAX, commit_threshold: u64::MAX } - .execute(&provider, execute_input) - .await - .unwrap(); - StorageHashingStage { clean_threshold: u64::MAX, commit_threshold: u64::MAX } - .execute(&provider, execute_input) - .await - .unwrap(); + AccountHashingStage { + clean_threshold: u64::MAX, + commit_threshold: u64::MAX, + prune_modes: PruneModes::none(), + } + .execute(&provider, execute_input) + .await + .unwrap(); + StorageHashingStage { + clean_threshold: u64::MAX, + commit_threshold: u64::MAX, + prune_modes: PruneModes::none(), + } + .execute(&provider, execute_input) + .await + .unwrap(); let unwind_inner_tx = provider.into_tx(); @@ -124,6 +132,7 @@ async fn dry_run( clean_threshold: u64::MAX, /* Forces updating the root instead of calculating * from * scratch */ + prune_modes: Default::default(), } .execute( &provider, diff --git a/bin/reth/src/stage/run.rs b/bin/reth/src/stage/run.rs index f53412c3db..9953bfc8a0 100644 --- a/bin/reth/src/stage/run.rs +++ b/bin/reth/src/stage/run.rs @@ -208,12 +208,22 @@ impl Command { ) } StageEnum::TxLookup => (Box::new(TransactionLookupStage::new(batch_size)), None), - StageEnum::AccountHashing => { - (Box::new(AccountHashingStage::new(1, batch_size)), None) - } - StageEnum::StorageHashing => { - (Box::new(StorageHashingStage::new(1, batch_size)), None) - } + StageEnum::AccountHashing => ( + Box::new(AccountHashingStage::new( + 1, + batch_size, + config.prune.map(|prune| prune.parts).unwrap_or_default(), + )), + None, + ), + StageEnum::StorageHashing => ( + Box::new(StorageHashingStage::new( + 1, + batch_size, + config.prune.map(|prune| prune.parts).unwrap_or_default(), + )), + None, + ), StageEnum::Merkle => ( Box::new(MerkleStage::default_execution()), Some(Box::new(MerkleStage::default_unwind())), diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs index 8e82bdd707..249073804f 100644 --- a/crates/primitives/src/lib.rs +++ b/crates/primitives/src/lib.rs @@ -79,7 +79,7 @@ pub use net::{ SEPOLIA_BOOTNODES, }; pub use peer::{PeerId, WithPeerId}; -pub use prune::{PruneCheckpoint, PruneMode, PruneModes, PrunePart}; +pub use prune::{PruneCheckpoint, PruneMode, PruneModes, PrunePart, PrunePartError}; pub use receipt::{Receipt, ReceiptWithBloom, ReceiptWithBloomRef}; pub use revm_primitives::JumpMap; pub use serde_helper::JsonU256; diff --git a/crates/primitives/src/prune/mod.rs b/crates/primitives/src/prune/mod.rs index 4dfc591bfc..a3bcb95962 100644 --- a/crates/primitives/src/prune/mod.rs +++ b/crates/primitives/src/prune/mod.rs @@ -5,5 +5,5 @@ mod target; pub use checkpoint::PruneCheckpoint; pub use mode::PruneMode; -pub use part::PrunePart; +pub use part::{PrunePart, PrunePartError}; pub use target::PruneModes; diff --git a/crates/primitives/src/prune/part.rs b/crates/primitives/src/prune/part.rs index f47ea03d1b..db49870735 100644 --- a/crates/primitives/src/prune/part.rs +++ b/crates/primitives/src/prune/part.rs @@ -1,8 +1,10 @@ +use derive_more::Display; use reth_codecs::{main_codec, Compact}; +use thiserror::Error; /// Part of the data that can be pruned. #[main_codec] -#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)] +#[derive(Debug, Display, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)] pub enum PrunePart { /// Prune part responsible for the `TxSenders` table. SenderRecovery, @@ -16,6 +18,14 @@ pub enum PrunePart { StorageHistory, } +/// PrunePart error type. +#[derive(Debug, Error)] +pub enum PrunePartError { + /// Invalid configuration of a prune part. + #[error("The configuration provided for {0} is invalid.")] + Configuration(PrunePart), +} + #[cfg(test)] impl Default for PrunePart { fn default() -> Self { diff --git a/crates/primitives/src/prune/target.rs b/crates/primitives/src/prune/target.rs index b314ae9d03..af6781897c 100644 --- a/crates/primitives/src/prune/target.rs +++ b/crates/primitives/src/prune/target.rs @@ -1,4 +1,7 @@ -use crate::{serde_helper::deserialize_opt_prune_mode_with_min_blocks, BlockNumber, PruneMode}; +use crate::{ + prune::PrunePartError, serde_helper::deserialize_opt_prune_mode_with_min_blocks, BlockNumber, + PruneMode, PrunePart, +}; use paste::paste; use serde::{Deserialize, Serialize}; @@ -19,10 +22,16 @@ pub struct PruneModes { )] pub receipts: Option, /// Account History pruning configuration. - #[serde(skip_serializing_if = "Option::is_none")] + #[serde( + skip_serializing_if = "Option::is_none", + deserialize_with = "deserialize_opt_prune_mode_with_min_blocks::<64, _>" + )] pub account_history: Option, /// Storage History pruning configuration. - #[serde(skip_serializing_if = "Option::is_none")] + #[serde( + skip_serializing_if = "Option::is_none", + deserialize_with = "deserialize_opt_prune_mode_with_min_blocks::<64, _>" + )] pub storage_history: Option, } @@ -51,12 +60,15 @@ macro_rules! impl_prune_parts { $human_part, " pruning needs to be done, inclusive, according to the provided tip." )] - pub fn [](&self, tip: BlockNumber) -> Option<(BlockNumber, PruneMode)> { - self.$part.as_ref().and_then(|mode| { - self.prune_target_block(mode, tip, $min_blocks).map(|block| { - (block, *mode) - }) - }) + pub fn [](&self, tip: BlockNumber) -> Result, PrunePartError> { + match &self.$part { + Some(mode) => + match self.prune_target_block(mode, tip, $min_blocks) { + Some(block) => Ok(Some((block, *mode))), + None => Err(PrunePartError::Configuration(PrunePart::[<$human_part>])) + } + None => Ok(None) + } } } )+ @@ -107,17 +119,17 @@ impl PruneModes { Some(tip.saturating_sub(*distance)) } PruneMode::Before(n) if tip.saturating_sub(*n) >= min_blocks.unwrap_or_default() => { - Some(*n) + Some(n.saturating_sub(1)) } _ => None, } } impl_prune_parts!( - (sender_recovery, "Sender Recovery", None), - (transaction_lookup, "Transaction Lookup", None), + (sender_recovery, "SenderRecovery", None), + (transaction_lookup, "TransactionLookup", None), (receipts, "Receipts", Some(64)), - (account_history, "Account History", None), - (storage_history, "Storage History", None) + (account_history, "AccountHistory", Some(64)), + (storage_history, "StorageHistory", Some(64)) ); } diff --git a/crates/prune/src/error.rs b/crates/prune/src/error.rs index fdc0af4484..1a31a03942 100644 --- a/crates/prune/src/error.rs +++ b/crates/prune/src/error.rs @@ -4,6 +4,9 @@ use thiserror::Error; #[derive(Error, Debug)] pub enum PrunerError { + #[error(transparent)] + PrunePart(#[from] reth_primitives::PrunePartError), + #[error("Inconsistent data: {0}")] InconsistentData(&'static str), diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index 96f41c9ae2..3dae8bcb04 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -72,13 +72,13 @@ impl Pruner { let provider = self.provider_factory.provider_rw()?; if let Some((to_block, prune_mode)) = - self.modes.prune_target_block_receipts(tip_block_number) + self.modes.prune_target_block_receipts(tip_block_number)? { self.prune_receipts(&provider, to_block, prune_mode)?; } if let Some((to_block, prune_mode)) = - self.modes.prune_target_block_transaction_lookup(tip_block_number) + self.modes.prune_target_block_transaction_lookup(tip_block_number)? { self.prune_transaction_lookup(&provider, to_block, prune_mode)?; } diff --git a/crates/stages/benches/criterion.rs b/crates/stages/benches/criterion.rs index 8fce2e3703..d9b079d298 100644 --- a/crates/stages/benches/criterion.rs +++ b/crates/stages/benches/criterion.rs @@ -95,7 +95,7 @@ fn merkle(c: &mut Criterion) { // don't need to run each stage for that many times group.sample_size(10); - let stage = MerkleStage::Both { clean_threshold: u64::MAX }; + let stage = MerkleStage::Both { clean_threshold: u64::MAX, prune_modes: Default::default() }; measure_stage( &mut group, setup::unwind_hashes, @@ -104,7 +104,7 @@ fn merkle(c: &mut Criterion) { "Merkle-incremental".to_string(), ); - let stage = MerkleStage::Both { clean_threshold: 0 }; + let stage = MerkleStage::Both { clean_threshold: 0, prune_modes: Default::default() }; measure_stage( &mut group, setup::unwind_hashes, diff --git a/crates/stages/src/error.rs b/crates/stages/src/error.rs index 20310111ca..b5158f3e15 100644 --- a/crates/stages/src/error.rs +++ b/crates/stages/src/error.rs @@ -49,6 +49,9 @@ pub enum StageError { #[source] error: executor::BlockExecutionError, }, + /// Invalid pruning configuration + #[error(transparent)] + PruningConfiguration(#[from] reth_primitives::PrunePartError), /// Invalid checkpoint passed to the stage #[error("Invalid stage checkpoint: {0}")] StageCheckpoint(u64), diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index 366e10e7c3..fc8e789a1f 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -60,7 +60,7 @@ pub struct ExecutionStage { /// The commit thresholds of the execution stage. thresholds: ExecutionStageThresholds, /// Pruning configuration. - prune_targets: PruneModes, + prune_modes: PruneModes, } impl ExecutionStage { @@ -68,9 +68,9 @@ impl ExecutionStage { pub fn new( executor_factory: EF, thresholds: ExecutionStageThresholds, - prune_targets: PruneModes, + prune_modes: PruneModes, ) -> Self { - Self { metrics_tx: None, executor_factory, thresholds, prune_targets } + Self { metrics_tx: None, executor_factory, thresholds, prune_modes } } /// Create an execution stage with the provided executor factory. @@ -110,7 +110,7 @@ impl ExecutionStage { // Execute block range let mut state = PostState::default(); - state.add_prune_targets(self.prune_targets); + state.add_prune_modes(self.prune_modes); for block_number in start_block..=max_block { let td = provider @@ -425,8 +425,7 @@ mod tests { use reth_db::{models::AccountBeforeTx, test_utils::create_test_rw_db}; use reth_primitives::{ hex_literal::hex, keccak256, stage::StageUnitCheckpoint, Account, Bytecode, - ChainSpecBuilder, PruneMode, PruneModes, SealedBlock, StorageEntry, H160, H256, MAINNET, - U256, + ChainSpecBuilder, PruneModes, SealedBlock, StorageEntry, H160, H256, MAINNET, U256, }; use reth_provider::{AccountReader, BlockWriter, ProviderFactory, ReceiptProvider}; use reth_revm::Factory; @@ -894,86 +893,4 @@ mod tests { ] ); } - - #[tokio::test] - async fn test_prune() { - let test_tx = TestTransaction::default(); - let factory = Arc::new(ProviderFactory::new(test_tx.tx.as_ref(), MAINNET.clone())); - - let provider = factory.provider_rw().unwrap(); - let input = ExecInput { - target: Some(1), - /// The progress of this stage the last time it was executed. - checkpoint: None, - }; - let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000808502540be400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice(); - let genesis = SealedBlock::decode(&mut genesis_rlp).unwrap(); - let mut block_rlp = hex!("f90262f901f9a075c371ba45999d87f4542326910a11af515897aebce5265d3f6acd1f1161f82fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa098f2dcd87c8ae4083e7017a05456c14eea4b1db2032126e27b3b1563d57d7cc0a08151d548273f6683169524b66ca9fe338b9ce42bc3540046c828fd939ae23bcba03f4e5c2ec5b2170b711d97ee755c160457bb58d8daa338e835ec02ae6860bbabb901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000018502540be40082a8798203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f863f861800a8405f5e10094100000000000000000000000000000000000000080801ba07e09e26678ed4fac08a249ebe8ed680bf9051a5e14ad223e4b2b9d26e0208f37a05f6e3f188e3e6eab7d7d3b6568f5eac7d687b08d307d3154ccd8c87b4630509bc0").as_slice(); - let block = SealedBlock::decode(&mut block_rlp).unwrap(); - provider.insert_block(genesis, None).unwrap(); - provider.insert_block(block.clone(), None).unwrap(); - provider.commit().unwrap(); - - // insert pre state - let provider = factory.provider_rw().unwrap(); - let code = hex!("5a465a905090036002900360015500"); - let code_hash = keccak256(hex!("5a465a905090036002900360015500")); - provider - .tx_ref() - .put::( - H160(hex!("1000000000000000000000000000000000000000")), - Account { nonce: 0, balance: U256::ZERO, bytecode_hash: Some(code_hash) }, - ) - .unwrap(); - provider - .tx_ref() - .put::( - H160(hex!("a94f5374fce5edbc8e2a8697c15331677e6ebf0b")), - Account { - nonce: 0, - balance: U256::from(0x3635c9adc5dea00000u128), - bytecode_hash: None, - }, - ) - .unwrap(); - provider - .tx_ref() - .put::(code_hash, Bytecode::new_raw(code.to_vec().into())) - .unwrap(); - provider.commit().unwrap(); - - let check_pruning = |factory: Arc>, - prune_targets: PruneModes, - expect_num_receipts: usize| async move { - let provider = factory.provider_rw().unwrap(); - - let mut execution_stage = ExecutionStage::new( - Factory::new(Arc::new(ChainSpecBuilder::mainnet().berlin_activated().build())), - ExecutionStageThresholds { max_blocks: Some(100), max_changes: None }, - prune_targets, - ); - - execution_stage.execute(&provider, input).await.unwrap(); - assert_eq!( - provider.receipts_by_block(1.into()).unwrap().unwrap().len(), - expect_num_receipts - ); - }; - - let mut prune = PruneModes::none(); - - check_pruning(factory.clone(), prune, 1).await; - - prune.receipts = Some(PruneMode::Full); - check_pruning(factory.clone(), prune, 0).await; - - prune.receipts = Some(PruneMode::Before(1)); - check_pruning(factory.clone(), prune, 1).await; - - prune.receipts = Some(PruneMode::Before(2)); - check_pruning(factory.clone(), prune, 0).await; - - prune.receipts = Some(PruneMode::Distance(0)); - check_pruning(factory.clone(), prune, 1).await; - } } diff --git a/crates/stages/src/stages/hashing_account.rs b/crates/stages/src/stages/hashing_account.rs index ccb6fb960f..7eed29bd33 100644 --- a/crates/stages/src/stages/hashing_account.rs +++ b/crates/stages/src/stages/hashing_account.rs @@ -15,6 +15,7 @@ use reth_primitives::{ AccountHashingCheckpoint, CheckpointBlockRange, EntitiesCheckpoint, StageCheckpoint, StageId, }, + PruneModes, }; use reth_provider::{AccountExtReader, DatabaseProviderRW, HashingWriter}; use std::{ @@ -34,18 +35,25 @@ pub struct AccountHashingStage { pub clean_threshold: u64, /// The maximum number of accounts to process before committing. pub commit_threshold: u64, + /// Prune mode configuration. Required to know if we can actually make an incremental + /// update based on how many changesets exist. + pub prune_modes: PruneModes, } impl AccountHashingStage { /// Create new instance of [AccountHashingStage]. - pub fn new(clean_threshold: u64, commit_threshold: u64) -> Self { - Self { clean_threshold, commit_threshold } + pub fn new(clean_threshold: u64, commit_threshold: u64, prune_modes: PruneModes) -> Self { + Self { clean_threshold, commit_threshold, prune_modes } } } impl Default for AccountHashingStage { fn default() -> Self { - Self { clean_threshold: 500_000, commit_threshold: 100_000 } + Self { + clean_threshold: 500_000, + commit_threshold: 100_000, + prune_modes: PruneModes::default(), + } } } @@ -143,12 +151,19 @@ impl Stage for AccountHashingStage { } let (from_block, to_block) = input.next_block_range().into_inner(); + let has_enough_changesets = self + .prune_modes + .prune_target_block_account_history(to_block)? + .map(|(block_number, _)| block_number) + .unwrap_or_default() < + from_block; // if there are more blocks then threshold it is faster to go over Plain state and hash all // account otherwise take changesets aggregate the sets and apply hashing to // AccountHashing table. Also, if we start from genesis, we need to hash from scratch, as // genesis accounts are not in changeset. - if to_block - from_block > self.clean_threshold || from_block == 1 { + if to_block - from_block > self.clean_threshold || from_block == 1 || !has_enough_changesets + { let tx = provider.tx_ref(); let stage_checkpoint = input .checkpoint @@ -448,6 +463,7 @@ mod tests { pub(crate) tx: TestTransaction, commit_threshold: u64, clean_threshold: u64, + prune_modes: PruneModes, } impl AccountHashingTestRunner { @@ -511,6 +527,7 @@ mod tests { tx: TestTransaction::default(), commit_threshold: 1000, clean_threshold: 1000, + prune_modes: PruneModes::default(), } } } @@ -526,6 +543,7 @@ mod tests { Self::S { commit_threshold: self.commit_threshold, clean_threshold: self.clean_threshold, + prune_modes: self.prune_modes, } } } diff --git a/crates/stages/src/stages/hashing_storage.rs b/crates/stages/src/stages/hashing_storage.rs index 040b6375bd..c6a65b3d80 100644 --- a/crates/stages/src/stages/hashing_storage.rs +++ b/crates/stages/src/stages/hashing_storage.rs @@ -14,7 +14,7 @@ use reth_primitives::{ CheckpointBlockRange, EntitiesCheckpoint, StageCheckpoint, StageId, StorageHashingCheckpoint, }, - StorageEntry, + PruneModes, StorageEntry, }; use reth_provider::{DatabaseProviderRW, HashingWriter, StorageReader}; use std::{collections::BTreeMap, fmt::Debug}; @@ -29,18 +29,25 @@ pub struct StorageHashingStage { pub clean_threshold: u64, /// The maximum number of slots to process before committing. pub commit_threshold: u64, + /// Prune mode configuration. Required to know if we can actually make an incremental + /// update based on how many changesets exist. + pub prune_modes: PruneModes, } impl StorageHashingStage { /// Create new instance of [StorageHashingStage]. - pub fn new(clean_threshold: u64, commit_threshold: u64) -> Self { - Self { clean_threshold, commit_threshold } + pub fn new(clean_threshold: u64, commit_threshold: u64, prune_modes: PruneModes) -> Self { + Self { clean_threshold, commit_threshold, prune_modes } } } impl Default for StorageHashingStage { fn default() -> Self { - Self { clean_threshold: 500_000, commit_threshold: 100_000 } + Self { + clean_threshold: 500_000, + commit_threshold: 100_000, + prune_modes: PruneModes::default(), + } } } @@ -63,12 +70,19 @@ impl Stage for StorageHashingStage { } let (from_block, to_block) = input.next_block_range().into_inner(); + let has_enough_changesets = self + .prune_modes + .prune_target_block_storage_history(to_block)? + .map(|(block_number, _)| block_number) + .unwrap_or_default() < + from_block; // if there are more blocks then threshold it is faster to go over Plain state and hash all // account otherwise take changesets aggregate the sets and apply hashing to // AccountHashing table. Also, if we start from genesis, we need to hash from scratch, as // genesis accounts are not in changeset, along with their storages. - if to_block - from_block > self.clean_threshold || from_block == 1 { + if to_block - from_block > self.clean_threshold || from_block == 1 || !has_enough_changesets + { let stage_checkpoint = input .checkpoint .and_then(|checkpoint| checkpoint.storage_hashing_stage_checkpoint()); @@ -457,11 +471,17 @@ mod tests { tx: TestTransaction, commit_threshold: u64, clean_threshold: u64, + prune_modes: PruneModes, } impl Default for StorageHashingTestRunner { fn default() -> Self { - Self { tx: TestTransaction::default(), commit_threshold: 1000, clean_threshold: 1000 } + Self { + tx: TestTransaction::default(), + commit_threshold: 1000, + clean_threshold: 1000, + prune_modes: PruneModes::default(), + } } } @@ -476,6 +496,7 @@ mod tests { Self::S { commit_threshold: self.commit_threshold, clean_threshold: self.clean_threshold, + prune_modes: self.prune_modes, } } } diff --git a/crates/stages/src/stages/index_account_history.rs b/crates/stages/src/stages/index_account_history.rs index fe0b6d3b40..d259ec3f81 100644 --- a/crates/stages/src/stages/index_account_history.rs +++ b/crates/stages/src/stages/index_account_history.rs @@ -1,6 +1,9 @@ use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use reth_db::database::Database; -use reth_primitives::stage::{StageCheckpoint, StageId}; +use reth_primitives::{ + stage::{StageCheckpoint, StageId}, + PruneModes, +}; use reth_provider::{AccountExtReader, DatabaseProviderRW, HistoryWriter}; use std::fmt::Debug; @@ -12,18 +15,20 @@ pub struct IndexAccountHistoryStage { /// Number of blocks after which the control /// flow will be returned to the pipeline for commit. pub commit_threshold: u64, + /// Pruning configuration. + pub prune_modes: PruneModes, } impl IndexAccountHistoryStage { /// Create new instance of [IndexAccountHistoryStage]. pub fn new(commit_threshold: u64) -> Self { - Self { commit_threshold } + Self { commit_threshold, prune_modes: PruneModes::default() } } } impl Default for IndexAccountHistoryStage { fn default() -> Self { - Self { commit_threshold: 100_000 } + Self { commit_threshold: 100_000, prune_modes: PruneModes::default() } } } @@ -38,8 +43,16 @@ impl Stage for IndexAccountHistoryStage { async fn execute( &mut self, provider: &DatabaseProviderRW<'_, &DB>, - input: ExecInput, + mut input: ExecInput, ) -> Result { + if let Some((target_prunable_block, _)) = + self.prune_modes.prune_target_block_account_history(input.target())? + { + if target_prunable_block > input.checkpoint().block_number { + input.checkpoint = Some(StageCheckpoint::new(target_prunable_block)); + } + } + if input.target_reached() { return Ok(ExecOutput::done(input.checkpoint())) } @@ -372,11 +385,16 @@ mod tests { struct IndexAccountHistoryTestRunner { pub(crate) tx: TestTransaction, commit_threshold: u64, + prune_modes: PruneModes, } impl Default for IndexAccountHistoryTestRunner { fn default() -> Self { - Self { tx: TestTransaction::default(), commit_threshold: 1000 } + Self { + tx: TestTransaction::default(), + commit_threshold: 1000, + prune_modes: PruneModes::default(), + } } } @@ -388,7 +406,7 @@ mod tests { } fn stage(&self) -> Self::S { - Self::S { commit_threshold: self.commit_threshold } + Self::S { commit_threshold: self.commit_threshold, prune_modes: self.prune_modes } } } diff --git a/crates/stages/src/stages/index_storage_history.rs b/crates/stages/src/stages/index_storage_history.rs index a17c5f14e7..4759cd82c5 100644 --- a/crates/stages/src/stages/index_storage_history.rs +++ b/crates/stages/src/stages/index_storage_history.rs @@ -1,6 +1,9 @@ use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use reth_db::{database::Database, models::BlockNumberAddress}; -use reth_primitives::stage::{StageCheckpoint, StageId}; +use reth_primitives::{ + stage::{StageCheckpoint, StageId}, + PruneModes, +}; use reth_provider::{DatabaseProviderRW, HistoryWriter, StorageReader}; use std::fmt::Debug; @@ -12,18 +15,20 @@ pub struct IndexStorageHistoryStage { /// Number of blocks after which the control /// flow will be returned to the pipeline for commit. pub commit_threshold: u64, + /// Pruning configuration. + pub prune_modes: PruneModes, } impl IndexStorageHistoryStage { /// Create new instance of [IndexStorageHistoryStage]. pub fn new(commit_threshold: u64) -> Self { - Self { commit_threshold } + Self { commit_threshold, prune_modes: PruneModes::default() } } } impl Default for IndexStorageHistoryStage { fn default() -> Self { - Self { commit_threshold: 100_000 } + Self { commit_threshold: 100_000, prune_modes: PruneModes::default() } } } @@ -38,8 +43,16 @@ impl Stage for IndexStorageHistoryStage { async fn execute( &mut self, provider: &DatabaseProviderRW<'_, &DB>, - input: ExecInput, + mut input: ExecInput, ) -> Result { + if let Some((target_prunable_block, _)) = + self.prune_modes.prune_target_block_storage_history(input.target())? + { + if target_prunable_block > input.checkpoint().block_number { + input.checkpoint = Some(StageCheckpoint::new(target_prunable_block)); + } + } + if input.target_reached() { return Ok(ExecOutput::done(input.checkpoint())) } @@ -386,11 +399,16 @@ mod tests { struct IndexStorageHistoryTestRunner { pub(crate) tx: TestTransaction, commit_threshold: u64, + prune_modes: PruneModes, } impl Default for IndexStorageHistoryTestRunner { fn default() -> Self { - Self { tx: TestTransaction::default(), commit_threshold: 1000 } + Self { + tx: TestTransaction::default(), + commit_threshold: 1000, + prune_modes: PruneModes::default(), + } } } @@ -402,7 +420,7 @@ mod tests { } fn stage(&self) -> Self::S { - Self::S { commit_threshold: self.commit_threshold } + Self::S { commit_threshold: self.commit_threshold, prune_modes: self.prune_modes } } } diff --git a/crates/stages/src/stages/merkle.rs b/crates/stages/src/stages/merkle.rs index 1a95341d85..ffcc427b58 100644 --- a/crates/stages/src/stages/merkle.rs +++ b/crates/stages/src/stages/merkle.rs @@ -10,7 +10,7 @@ use reth_primitives::{ hex, stage::{EntitiesCheckpoint, MerkleCheckpoint, StageCheckpoint, StageId}, trie::StoredSubNode, - BlockNumber, SealedHeader, H256, + BlockNumber, PruneModes, SealedHeader, H256, }; use reth_provider::{ DatabaseProviderRW, HeaderProvider, ProviderError, StageCheckpointReader, StageCheckpointWriter, @@ -47,6 +47,9 @@ pub enum MerkleStage { /// The threshold (in number of blocks) for switching from incremental trie building /// of changes to whole rebuild. clean_threshold: u64, + /// Prune mode configuration. Required to know if we can actually make an incremental root + /// update based on how many changesets exist. + prune_modes: PruneModes, }, /// The unwind portion of the merkle stage. Unwind, @@ -54,13 +57,13 @@ pub enum MerkleStage { /// Able to execute and unwind. Used for tests #[cfg(any(test, feature = "test-utils"))] #[allow(missing_docs)] - Both { clean_threshold: u64 }, + Both { clean_threshold: u64, prune_modes: PruneModes }, } impl MerkleStage { /// Stage default for the [MerkleStage::Execution]. pub fn default_execution() -> Self { - Self::Execution { clean_threshold: 50_000 } + Self::Execution { clean_threshold: 50_000, prune_modes: PruneModes::default() } } /// Stage default for the [MerkleStage::Unwind]. @@ -69,8 +72,8 @@ impl MerkleStage { } /// Create new instance of [MerkleStage::Execution]. - pub fn new_execution(clean_threshold: u64) -> Self { - Self::Execution { clean_threshold } + pub fn new_execution(clean_threshold: u64, prune_modes: PruneModes) -> Self { + Self::Execution { clean_threshold, prune_modes } } /// Check that the computed state root matches the root in the expected header. @@ -128,6 +131,26 @@ impl MerkleStage { } Ok(provider.save_stage_checkpoint_progress(StageId::MerkleExecute, buf)?) } + + /// When pruning is enabled for account and storage history, we might not have all changesets + /// required for an incremental state root update on a pipeline re-run. + pub fn has_enough_changesets( + &self, + prune_modes: PruneModes, + from_block: BlockNumber, + to_block: BlockNumber, + ) -> Result { + Ok(prune_modes + .prune_target_block_account_history(to_block)? + .map(|(block_number, _)| block_number) + .unwrap_or_default() < + from_block && + prune_modes + .prune_target_block_storage_history(to_block)? + .map(|(block_number, _)| block_number) + .unwrap_or_default() < + from_block) + } } #[async_trait::async_trait] @@ -148,14 +171,16 @@ impl Stage for MerkleStage { provider: &DatabaseProviderRW<'_, &DB>, input: ExecInput, ) -> Result { - let threshold = match self { + let (threshold, prune_modes) = match self { MerkleStage::Unwind => { info!(target: "sync::stages::merkle::unwind", "Stage is always skipped"); return Ok(ExecOutput::done(StageCheckpoint::new(input.target()))) } - MerkleStage::Execution { clean_threshold } => *clean_threshold, + MerkleStage::Execution { clean_threshold, prune_modes } => { + (*clean_threshold, *prune_modes) + } #[cfg(any(test, feature = "test-utils"))] - MerkleStage::Both { clean_threshold } => *clean_threshold, + MerkleStage::Both { clean_threshold, prune_modes } => (*clean_threshold, *prune_modes), }; let range = input.next_block_range(); @@ -168,10 +193,12 @@ impl Stage for MerkleStage { let target_block_root = target_block.state_root; let mut checkpoint = self.get_execution_checkpoint(provider)?; - let (trie_root, entities_checkpoint) = if range.is_empty() { (target_block_root, input.checkpoint().entities_stage_checkpoint().unwrap_or_default()) - } else if to_block - from_block > threshold || from_block == 1 { + } else if to_block - from_block > threshold || + from_block == 1 || + !self.has_enough_changesets(prune_modes, from_block, to_block)? + { // if there are more blocks than threshold it is faster to rebuild the trie let mut entities_checkpoint = if let Some(checkpoint) = checkpoint.as_ref().filter(|c| c.target_block == to_block) @@ -445,11 +472,16 @@ mod tests { struct MerkleTestRunner { tx: TestTransaction, clean_threshold: u64, + prune_modes: PruneModes, } impl Default for MerkleTestRunner { fn default() -> Self { - Self { tx: TestTransaction::default(), clean_threshold: 10000 } + Self { + tx: TestTransaction::default(), + clean_threshold: 10000, + prune_modes: PruneModes::default(), + } } } @@ -461,7 +493,7 @@ mod tests { } fn stage(&self) -> Self::S { - Self::S::Both { clean_threshold: self.clean_threshold } + Self::S::Both { clean_threshold: self.clean_threshold, prune_modes: self.prune_modes } } } diff --git a/crates/stages/src/stages/mod.rs b/crates/stages/src/stages/mod.rs index 96f1ec5fec..3cf295abeb 100644 --- a/crates/stages/src/stages/mod.rs +++ b/crates/stages/src/stages/mod.rs @@ -35,3 +35,188 @@ pub use merkle::*; pub use sender_recovery::*; pub use total_difficulty::*; pub use tx_lookup::*; + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + stage::Stage, + stages::{ExecutionStage, IndexAccountHistoryStage, IndexStorageHistoryStage}, + test_utils::TestTransaction, + ExecInput, + }; + use reth_db::{ + cursor::DbCursorRO, + mdbx::{cursor::Cursor, RW}, + tables, + transaction::{DbTx, DbTxMut}, + AccountHistory, DatabaseEnv, + }; + use reth_interfaces::test_utils::generators::{self, random_block}; + use reth_primitives::{ + hex_literal::hex, keccak256, Account, Bytecode, ChainSpecBuilder, PruneMode, PruneModes, + SealedBlock, H160, MAINNET, U256, + }; + use reth_provider::{ + AccountExtReader, BlockWriter, DatabaseProviderRW, ProviderFactory, ReceiptProvider, + StorageReader, + }; + use reth_revm::Factory; + use reth_rlp::Decodable; + use std::sync::Arc; + + #[tokio::test] + async fn test_prune() { + let test_tx = TestTransaction::default(); + let factory = Arc::new(ProviderFactory::new(test_tx.tx.as_ref(), MAINNET.clone())); + + let provider = factory.provider_rw().unwrap(); + let tip = 66; + let input = ExecInput { + target: Some(tip), + /// The progress of this stage the last time it was executed. + checkpoint: None, + }; + let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000808502540be400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice(); + let genesis = SealedBlock::decode(&mut genesis_rlp).unwrap(); + let mut block_rlp = hex!("f90262f901f9a075c371ba45999d87f4542326910a11af515897aebce5265d3f6acd1f1161f82fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa098f2dcd87c8ae4083e7017a05456c14eea4b1db2032126e27b3b1563d57d7cc0a08151d548273f6683169524b66ca9fe338b9ce42bc3540046c828fd939ae23bcba03f4e5c2ec5b2170b711d97ee755c160457bb58d8daa338e835ec02ae6860bbabb901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000018502540be40082a8798203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f863f861800a8405f5e10094100000000000000000000000000000000000000080801ba07e09e26678ed4fac08a249ebe8ed680bf9051a5e14ad223e4b2b9d26e0208f37a05f6e3f188e3e6eab7d7d3b6568f5eac7d687b08d307d3154ccd8c87b4630509bc0").as_slice(); + let block = SealedBlock::decode(&mut block_rlp).unwrap(); + provider.insert_block(genesis, None).unwrap(); + provider.insert_block(block.clone(), None).unwrap(); + + // Fill with bogus blocks to respect PruneMode distance. + let mut head = block.hash; + let mut rng = generators::rng(); + for block_number in 2..=tip { + let nblock = random_block(&mut rng, block_number, Some(head), Some(0), Some(0)); + head = nblock.hash; + provider.insert_block(nblock, None).unwrap(); + } + provider.commit().unwrap(); + + // insert pre state + let provider = factory.provider_rw().unwrap(); + let code = hex!("5a465a905090036002900360015500"); + let code_hash = keccak256(hex!("5a465a905090036002900360015500")); + provider + .tx_ref() + .put::( + H160(hex!("1000000000000000000000000000000000000000")), + Account { nonce: 0, balance: U256::ZERO, bytecode_hash: Some(code_hash) }, + ) + .unwrap(); + provider + .tx_ref() + .put::( + H160(hex!("a94f5374fce5edbc8e2a8697c15331677e6ebf0b")), + Account { + nonce: 0, + balance: U256::from(0x3635c9adc5dea00000u128), + bytecode_hash: None, + }, + ) + .unwrap(); + provider + .tx_ref() + .put::(code_hash, Bytecode::new_raw(code.to_vec().into())) + .unwrap(); + provider.commit().unwrap(); + + let check_pruning = |factory: Arc>, + prune_modes: PruneModes, + expect_num_receipts: usize, + expect_num_acc_changesets: usize, + expect_num_storage_changesets: usize| async move { + let provider: DatabaseProviderRW<'_, &DatabaseEnv> = factory.provider_rw().unwrap(); + + // Check execution and create receipts and changesets according to the pruning + // configuration + let mut execution_stage = ExecutionStage::new( + Factory::new(Arc::new(ChainSpecBuilder::mainnet().berlin_activated().build())), + ExecutionStageThresholds { max_blocks: Some(100), max_changes: None }, + prune_modes, + ); + + execution_stage.execute(&provider, input).await.unwrap(); + assert_eq!( + provider.receipts_by_block(1.into()).unwrap().unwrap().len(), + expect_num_receipts + ); + + assert_eq!( + provider.changed_storages_and_blocks_with_range(0..=1000).unwrap().len(), + expect_num_storage_changesets + ); + + assert_eq!( + provider.changed_accounts_and_blocks_with_range(0..=1000).unwrap().len(), + expect_num_acc_changesets + ); + + // Check AccountHistory + let mut acc_indexing_stage = + IndexAccountHistoryStage { prune_modes, ..Default::default() }; + + if let Some(PruneMode::Full) = prune_modes.account_history { + // Full is not supported + assert!(acc_indexing_stage.execute(&provider, input).await.is_err()); + } else { + acc_indexing_stage.execute(&provider, input).await.unwrap(); + let mut account_history: Cursor<'_, RW, AccountHistory> = + provider.tx_ref().cursor_read::().unwrap(); + assert_eq!(account_history.walk(None).unwrap().count(), expect_num_acc_changesets); + } + + // Check StorageHistory + let mut storage_indexing_stage = + IndexStorageHistoryStage { prune_modes, ..Default::default() }; + + if let Some(PruneMode::Full) = prune_modes.storage_history { + // Full is not supported + assert!(acc_indexing_stage.execute(&provider, input).await.is_err()); + } else { + storage_indexing_stage.execute(&provider, input).await.unwrap(); + + let mut storage_history = + provider.tx_ref().cursor_read::().unwrap(); + assert_eq!( + storage_history.walk(None).unwrap().count(), + expect_num_storage_changesets + ); + } + }; + + // In an unpruned configuration there is 1 receipt, 3 changed accounts and 1 changed + // storage. + let mut prune = PruneModes::none(); + check_pruning(factory.clone(), prune, 1, 3, 1).await; + + prune.receipts = Some(PruneMode::Full); + prune.account_history = Some(PruneMode::Full); + prune.storage_history = Some(PruneMode::Full); + // This will result in error for account_history and storage_history, which is caught. + check_pruning(factory.clone(), prune, 0, 0, 0).await; + + prune.receipts = Some(PruneMode::Before(1)); + prune.account_history = Some(PruneMode::Before(1)); + prune.storage_history = Some(PruneMode::Before(1)); + check_pruning(factory.clone(), prune, 1, 3, 1).await; + + prune.receipts = Some(PruneMode::Before(2)); + prune.account_history = Some(PruneMode::Before(2)); + prune.storage_history = Some(PruneMode::Before(2)); + // The one account is the miner + check_pruning(factory.clone(), prune, 0, 1, 0).await; + + prune.receipts = Some(PruneMode::Distance(66)); + prune.account_history = Some(PruneMode::Distance(66)); + prune.storage_history = Some(PruneMode::Distance(66)); + check_pruning(factory.clone(), prune, 1, 3, 1).await; + + prune.receipts = Some(PruneMode::Distance(64)); + prune.account_history = Some(PruneMode::Distance(64)); + prune.storage_history = Some(PruneMode::Distance(64)); + // The one account is the miner + check_pruning(factory.clone(), prune, 0, 1, 0).await; + } +} diff --git a/crates/storage/provider/src/post_state/mod.rs b/crates/storage/provider/src/post_state/mod.rs index 846012bffa..300b7e27d6 100644 --- a/crates/storage/provider/src/post_state/mod.rs +++ b/crates/storage/provider/src/post_state/mod.rs @@ -79,7 +79,7 @@ pub struct PostState { /// The receipt(s) of the executed transaction(s). receipts: BTreeMap>, /// Pruning configuration. - prune_targets: PruneModes, + prune_modes: PruneModes, } impl PostState { @@ -94,8 +94,8 @@ impl PostState { } /// Add a pruning configuration. - pub fn add_prune_targets(&mut self, prune_targets: PruneModes) { - self.prune_targets = prune_targets; + pub fn add_prune_modes(&mut self, prune_modes: PruneModes) { + self.prune_modes = prune_modes; } /// Return the current size of the poststate. @@ -518,6 +518,7 @@ impl PostState { pub fn write_history_to_db<'a, TX: DbTxMut<'a> + DbTx<'a>>( &mut self, tx: &TX, + tip: BlockNumber, ) -> Result<(), DbError> { // Write storage changes tracing::trace!(target: "provider::post_state", "Writing storage changes"); @@ -560,6 +561,10 @@ impl PostState { } } + if self.prune_modes.should_prune_storage_history(block_number, tip) { + continue + } + for (slot, old_value) in storage.storage { tracing::trace!(target: "provider::post_state", ?storage_id, ?slot, ?old_value, "Storage changed"); storage_changeset_cursor.append_dup( @@ -576,6 +581,10 @@ impl PostState { for (block_number, account_changes) in std::mem::take(&mut self.account_changes).inner.into_iter() { + if self.prune_modes.should_prune_account_history(block_number, tip) { + continue + } + for (address, info) in account_changes.into_iter() { tracing::trace!(target: "provider::post_state", block_number, ?address, old = ?info, "Account changed"); account_changeset_cursor @@ -592,7 +601,7 @@ impl PostState { tx: &TX, tip: BlockNumber, ) -> Result<(), DbError> { - self.write_history_to_db(tx)?; + self.write_history_to_db(tx, tip)?; // Write new storage state tracing::trace!(target: "provider::post_state", len = self.storage.len(), "Writing new storage state"); @@ -644,12 +653,12 @@ impl PostState { // Write the receipts of the transactions if not pruned tracing::trace!(target: "provider::post_state", len = self.receipts.len(), "Writing receipts"); - if !self.receipts.is_empty() && self.prune_targets.receipts != Some(PruneMode::Full) { + if !self.receipts.is_empty() && self.prune_modes.receipts != Some(PruneMode::Full) { let mut bodies_cursor = tx.cursor_read::()?; let mut receipts_cursor = tx.cursor_write::()?; for (block, receipts) in self.receipts { - if self.prune_targets.should_prune_receipts(block, tip) { + if self.prune_modes.should_prune_receipts(block, tip) { continue }