From edf31806d886c8b0e8bacb572bd19923de274190 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Thu, 31 Aug 2023 11:27:07 +0100 Subject: [PATCH] feat(stages): respect `PruneModes` in Index History stages (#4382) --- bin/reth/src/node/mod.rs | 6 +- .../src/stages/index_account_history.rs | 114 ++++++++++++++++-- .../src/stages/index_storage_history.rs | 113 +++++++++++++++-- crates/stages/src/stages/mod.rs | 39 ++++-- 4 files changed, 234 insertions(+), 38 deletions(-) diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index b85ff1c68a..f06baaaf25 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -817,6 +817,8 @@ impl NodeCommand { let factory = factory.with_stack_config(stack_config); + let prune_modes = prune_config.map(|prune| prune.parts).unwrap_or_default(); + let header_mode = if continuous { HeaderSyncMode::Continuous } else { HeaderSyncMode::Tip(tip_rx) }; let pipeline = builder @@ -849,7 +851,7 @@ impl NodeCommand { .clean_threshold .max(stage_config.account_hashing.clean_threshold) .max(stage_config.storage_hashing.clean_threshold), - prune_config.map(|prune| prune.parts).unwrap_or_default(), + prune_modes.clone(), ) .with_metrics_tx(metrics_tx), ) @@ -865,9 +867,11 @@ impl NodeCommand { .set(TransactionLookupStage::new(stage_config.transaction_lookup.commit_threshold)) .set(IndexAccountHistoryStage::new( stage_config.index_account_history.commit_threshold, + prune_modes.clone(), )) .set(IndexStorageHistoryStage::new( stage_config.index_storage_history.commit_threshold, + prune_modes, )), ) .build(db, self.chain.clone()); diff --git a/crates/stages/src/stages/index_account_history.rs b/crates/stages/src/stages/index_account_history.rs index c13123d086..b79af45d1e 100644 --- a/crates/stages/src/stages/index_account_history.rs +++ b/crates/stages/src/stages/index_account_history.rs @@ -1,33 +1,37 @@ use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use reth_db::database::Database; -use reth_primitives::stage::{StageCheckpoint, StageId}; -use reth_provider::{AccountExtReader, DatabaseProviderRW, HistoryWriter}; +use reth_primitives::{ + stage::{StageCheckpoint, StageId}, + PruneCheckpoint, PruneModes, PrunePart, +}; +use reth_provider::{ + AccountExtReader, DatabaseProviderRW, HistoryWriter, PruneCheckpointReader, + PruneCheckpointWriter, +}; use std::fmt::Debug; /// Stage is indexing history the account changesets generated in /// [`ExecutionStage`][crate::stages::ExecutionStage]. For more information /// on index sharding take a look at [`reth_db::tables::AccountHistory`] -/// -/// Pruning: we don't need to store and act on [`reth_primitives::PruneModes`], -/// because this stage indexes the already pruned account changesets generated by -/// [`crate::stages::ExecutionStage`]. #[derive(Debug)] pub struct IndexAccountHistoryStage { /// Number of blocks after which the control /// flow will be returned to the pipeline for commit. pub commit_threshold: u64, + /// Pruning configuration. + pub prune_modes: PruneModes, } impl IndexAccountHistoryStage { /// Create new instance of [IndexAccountHistoryStage]. - pub fn new(commit_threshold: u64) -> Self { - Self { commit_threshold } + pub fn new(commit_threshold: u64, prune_modes: PruneModes) -> Self { + Self { commit_threshold, prune_modes } } } impl Default for IndexAccountHistoryStage { fn default() -> Self { - Self { commit_threshold: 100_000 } + Self { commit_threshold: 100_000, prune_modes: PruneModes::none() } } } @@ -42,8 +46,29 @@ impl Stage for IndexAccountHistoryStage { async fn execute( &mut self, provider: &DatabaseProviderRW<'_, &DB>, - input: ExecInput, + mut input: ExecInput, ) -> Result { + if let Some((target_prunable_block, prune_mode)) = + self.prune_modes.prune_target_block_account_history(input.target())? + { + if target_prunable_block > input.checkpoint().block_number { + input.checkpoint = Some(StageCheckpoint::new(target_prunable_block)); + + // Save prune checkpoint only if we don't have one already. + // Otherwise, pruner may skip the unpruned range of blocks. + if provider.get_prune_checkpoint(PrunePart::AccountHistory)?.is_none() { + provider.save_prune_checkpoint( + PrunePart::AccountHistory, + PruneCheckpoint { + block_number: Some(target_prunable_block), + tx_number: None, + prune_mode, + }, + )?; + } + } + } + if input.target_reached() { return Ok(ExecOutput::done(input.checkpoint())) } @@ -98,7 +123,7 @@ mod tests { generators, generators::{random_block_range, random_changeset_range, random_contract_account_range}, }; - use reth_primitives::{hex_literal::hex, Address, BlockNumber, H160, H256, MAINNET}; + use reth_primitives::{hex_literal::hex, Address, BlockNumber, PruneMode, H160, H256, MAINNET}; const ADDRESS: H160 = H160(hex!("0000000000000000000000000000000000000001")); @@ -371,16 +396,76 @@ mod tests { ); } + #[tokio::test] + async fn insert_index_with_prune_modes() { + // init + let tx = TestTransaction::default(); + + // setup + tx.commit(|tx| { + // we just need first and last + tx.put::( + 0, + StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, + ) + .unwrap(); + + tx.put::( + 100, + StoredBlockBodyIndices { tx_count: 5, ..Default::default() }, + ) + .unwrap(); + + // setup changeset that are going to be applied to history index + tx.put::(20, acc()).unwrap(); + tx.put::(36, acc()).unwrap(); + tx.put::(100, acc()).unwrap(); + Ok(()) + }) + .unwrap(); + + // run + let input = ExecInput { target: Some(100), ..Default::default() }; + let mut stage = IndexAccountHistoryStage { + prune_modes: PruneModes { + account_history: Some(PruneMode::Before(36)), + ..Default::default() + }, + ..Default::default() + }; + let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone()); + let provider = factory.provider_rw().unwrap(); + let out = stage.execute(&provider, input).await.unwrap(); + assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }); + provider.commit().unwrap(); + + // verify + let table = cast(tx.table::().unwrap()); + assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![36, 100])])); + + // unwind + unwind(&tx, 100, 0).await; + + // verify initial state + let table = tx.table::().unwrap(); + assert!(table.is_empty()); + } + stage_test_suite_ext!(IndexAccountHistoryTestRunner, index_account_history); struct IndexAccountHistoryTestRunner { pub(crate) tx: TestTransaction, commit_threshold: u64, + prune_modes: PruneModes, } impl Default for IndexAccountHistoryTestRunner { fn default() -> Self { - Self { tx: TestTransaction::default(), commit_threshold: 1000 } + Self { + tx: TestTransaction::default(), + commit_threshold: 1000, + prune_modes: PruneModes::none(), + } } } @@ -392,7 +477,10 @@ mod tests { } fn stage(&self) -> Self::S { - Self::S { commit_threshold: self.commit_threshold } + Self::S { + commit_threshold: self.commit_threshold, + prune_modes: self.prune_modes.clone(), + } } } diff --git a/crates/stages/src/stages/index_storage_history.rs b/crates/stages/src/stages/index_storage_history.rs index 0f077fac58..8d03bb20d5 100644 --- a/crates/stages/src/stages/index_storage_history.rs +++ b/crates/stages/src/stages/index_storage_history.rs @@ -1,33 +1,36 @@ use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use reth_db::{database::Database, models::BlockNumberAddress}; -use reth_primitives::stage::{StageCheckpoint, StageId}; -use reth_provider::{DatabaseProviderRW, HistoryWriter, StorageReader}; +use reth_primitives::{ + stage::{StageCheckpoint, StageId}, + PruneCheckpoint, PruneModes, PrunePart, +}; +use reth_provider::{ + DatabaseProviderRW, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter, StorageReader, +}; use std::fmt::Debug; /// Stage is indexing history the account changesets generated in /// [`ExecutionStage`][crate::stages::ExecutionStage]. For more information /// on index sharding take a look at [`reth_db::tables::StorageHistory`]. -/// -/// Pruning: we don't need to store and act on [`reth_primitives::PruneModes`], -/// because this stage indexes the already pruned storage changesets generated by -/// [`crate::stages::ExecutionStage`]. #[derive(Debug)] pub struct IndexStorageHistoryStage { /// Number of blocks after which the control /// flow will be returned to the pipeline for commit. pub commit_threshold: u64, + /// Pruning configuration. + pub prune_modes: PruneModes, } impl IndexStorageHistoryStage { /// Create new instance of [IndexStorageHistoryStage]. - pub fn new(commit_threshold: u64) -> Self { - Self { commit_threshold } + pub fn new(commit_threshold: u64, prune_modes: PruneModes) -> Self { + Self { commit_threshold, prune_modes } } } impl Default for IndexStorageHistoryStage { fn default() -> Self { - Self { commit_threshold: 100_000 } + Self { commit_threshold: 100_000, prune_modes: PruneModes::none() } } } @@ -42,8 +45,29 @@ impl Stage for IndexStorageHistoryStage { async fn execute( &mut self, provider: &DatabaseProviderRW<'_, &DB>, - input: ExecInput, + mut input: ExecInput, ) -> Result { + if let Some((target_prunable_block, prune_mode)) = + self.prune_modes.prune_target_block_storage_history(input.target())? + { + if target_prunable_block > input.checkpoint().block_number { + input.checkpoint = Some(StageCheckpoint::new(target_prunable_block)); + + // Save prune checkpoint only if we don't have one already. + // Otherwise, pruner may skip the unpruned range of blocks. + if provider.get_prune_checkpoint(PrunePart::StorageHistory)?.is_none() { + provider.save_prune_checkpoint( + PrunePart::StorageHistory, + PruneCheckpoint { + block_number: Some(target_prunable_block), + tx_number: None, + prune_mode, + }, + )?; + } + } + } + if input.target_reached() { return Ok(ExecOutput::done(input.checkpoint())) } @@ -98,7 +122,7 @@ mod tests { generators::{random_block_range, random_changeset_range, random_contract_account_range}, }; use reth_primitives::{ - hex_literal::hex, Address, BlockNumber, StorageEntry, H160, H256, MAINNET, U256, + hex_literal::hex, Address, BlockNumber, PruneMode, StorageEntry, H160, H256, MAINNET, U256, }; const ADDRESS: H160 = H160(hex!("0000000000000000000000000000000000000001")); @@ -385,16 +409,76 @@ mod tests { ); } + #[tokio::test] + async fn insert_index_with_prune_modes() { + // init + let tx = TestTransaction::default(); + + // setup + tx.commit(|tx| { + // we just need first and last + tx.put::( + 0, + StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, + ) + .unwrap(); + + tx.put::( + 100, + StoredBlockBodyIndices { tx_count: 5, ..Default::default() }, + ) + .unwrap(); + + // setup changeset that are going to be applied to history index + tx.put::(trns(20), storage(STORAGE_KEY)).unwrap(); + tx.put::(trns(36), storage(STORAGE_KEY)).unwrap(); + tx.put::(trns(100), storage(STORAGE_KEY)).unwrap(); + Ok(()) + }) + .unwrap(); + + // run + let input = ExecInput { target: Some(100), ..Default::default() }; + let mut stage = IndexStorageHistoryStage { + prune_modes: PruneModes { + storage_history: Some(PruneMode::Before(36)), + ..Default::default() + }, + ..Default::default() + }; + let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone()); + let provider = factory.provider_rw().unwrap(); + let out = stage.execute(&provider, input).await.unwrap(); + assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }); + provider.commit().unwrap(); + + // verify + let table = cast(tx.table::().unwrap()); + assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![36, 100]),])); + + // unwind + unwind(&tx, 100, 0).await; + + // verify initial state + let table = tx.table::().unwrap(); + assert!(table.is_empty()); + } + stage_test_suite_ext!(IndexStorageHistoryTestRunner, index_storage_history); struct IndexStorageHistoryTestRunner { pub(crate) tx: TestTransaction, commit_threshold: u64, + prune_modes: PruneModes, } impl Default for IndexStorageHistoryTestRunner { fn default() -> Self { - Self { tx: TestTransaction::default(), commit_threshold: 1000 } + Self { + tx: TestTransaction::default(), + commit_threshold: 1000, + prune_modes: PruneModes::none(), + } } } @@ -406,7 +490,10 @@ mod tests { } fn stage(&self) -> Self::S { - Self::S { commit_threshold: self.commit_threshold } + Self::S { + commit_threshold: self.commit_threshold, + prune_modes: self.prune_modes.clone(), + } } } diff --git a/crates/stages/src/stages/mod.rs b/crates/stages/src/stages/mod.rs index 1adc72bdf9..87e045aee9 100644 --- a/crates/stages/src/stages/mod.rs +++ b/crates/stages/src/stages/mod.rs @@ -135,7 +135,7 @@ mod tests { Factory::new(Arc::new(ChainSpecBuilder::mainnet().berlin_activated().build())), ExecutionStageThresholds { max_blocks: Some(100), max_changes: None }, MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD, - prune_modes, + prune_modes.clone(), ); execution_stage.execute(&provider, input).await.unwrap(); @@ -155,19 +155,36 @@ mod tests { ); // Check AccountHistory - let mut acc_indexing_stage = IndexAccountHistoryStage::default(); - acc_indexing_stage.execute(&provider, input).await.unwrap(); - let mut account_history: Cursor<'_, RW, AccountHistory> = - provider.tx_ref().cursor_read::().unwrap(); - assert_eq!(account_history.walk(None).unwrap().count(), expect_num_acc_changesets); + let mut acc_indexing_stage = + IndexAccountHistoryStage { prune_modes: prune_modes.clone(), ..Default::default() }; + + if let Some(PruneMode::Full) = prune_modes.account_history { + // Full is not supported + assert!(acc_indexing_stage.execute(&provider, input).await.is_err()); + } else { + acc_indexing_stage.execute(&provider, input).await.unwrap(); + let mut account_history: Cursor<'_, RW, AccountHistory> = + provider.tx_ref().cursor_read::().unwrap(); + assert_eq!(account_history.walk(None).unwrap().count(), expect_num_acc_changesets); + } // Check StorageHistory - let mut storage_indexing_stage = IndexStorageHistoryStage::default(); - storage_indexing_stage.execute(&provider, input).await.unwrap(); + let mut storage_indexing_stage = + IndexStorageHistoryStage { prune_modes: prune_modes.clone(), ..Default::default() }; - let mut storage_history = - provider.tx_ref().cursor_read::().unwrap(); - assert_eq!(storage_history.walk(None).unwrap().count(), expect_num_storage_changesets); + if let Some(PruneMode::Full) = prune_modes.storage_history { + // Full is not supported + assert!(acc_indexing_stage.execute(&provider, input).await.is_err()); + } else { + storage_indexing_stage.execute(&provider, input).await.unwrap(); + + let mut storage_history = + provider.tx_ref().cursor_read::().unwrap(); + assert_eq!( + storage_history.walk(None).unwrap().count(), + expect_num_storage_changesets + ); + } }; // In an unpruned configuration there is 1 receipt, 3 changed accounts and 1 changed