diff --git a/crates/primitives/src/prune/target.rs b/crates/primitives/src/prune/target.rs index af6781897c..52e0806d33 100644 --- a/crates/primitives/src/prune/target.rs +++ b/crates/primitives/src/prune/target.rs @@ -10,7 +10,12 @@ use serde::{Deserialize, Serialize}; #[serde(default)] pub struct PruneModes { /// Sender Recovery pruning configuration. - #[serde(skip_serializing_if = "Option::is_none")] + // TODO(alexey): removing min blocks restriction is possible if we start calculating the senders + // dynamically on blockchain tree unwind. + #[serde( + skip_serializing_if = "Option::is_none", + deserialize_with = "deserialize_opt_prune_mode_with_min_blocks::<64, _>" + )] pub sender_recovery: Option, /// Transaction Lookup pruning configuration. #[serde(skip_serializing_if = "Option::is_none")] @@ -126,7 +131,7 @@ impl PruneModes { } impl_prune_parts!( - (sender_recovery, "SenderRecovery", None), + (sender_recovery, "SenderRecovery", Some(64)), (transaction_lookup, "TransactionLookup", None), (receipts, "Receipts", Some(64)), (account_history, "AccountHistory", Some(64)), diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index 3dae8bcb04..4a50faf268 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -22,11 +22,12 @@ pub type PrunerWithResult = (Pruner, PrunerResult); pub struct BatchSizes { receipts: usize, transaction_lookup: usize, + transaction_senders: usize, } impl Default for BatchSizes { fn default() -> Self { - Self { receipts: 10000, transaction_lookup: 10000 } + Self { receipts: 10000, transaction_lookup: 10000, transaction_senders: 10000 } } } @@ -83,6 +84,12 @@ impl Pruner { self.prune_transaction_lookup(&provider, to_block, prune_mode)?; } + if let Some((to_block, prune_mode)) = + self.modes.prune_target_block_sender_recovery(tip_block_number)? + { + self.prune_transaction_senders(&provider, to_block, prune_mode)?; + } + provider.commit()?; self.last_pruned_block_number = Some(tip_block_number); @@ -124,13 +131,16 @@ impl Pruner { prune_part: PrunePart, to_block: BlockNumber, ) -> reth_interfaces::Result>> { - let from_tx_num = provider - .get_prune_checkpoint(prune_part)? - .map(|checkpoint| provider.block_body_indices(checkpoint.block_number + 1)) - .transpose()? - .flatten() - .map(|body| body.first_tx_num) - .unwrap_or_default(); + let checkpoint = provider.get_prune_checkpoint(prune_part)?.unwrap_or(PruneCheckpoint { + block_number: 0, // No checkpoint, fresh pruning + prune_mode: PruneMode::Full, // Doesn't matter in this case, can be anything + }); + // Get first transaction of the next block after the highest pruned one + let from_tx_num = + provider.block_body_indices(checkpoint.block_number + 1)?.map(|body| body.first_tx_num); + // If no block body index is found, the DB is either corrupted or we've already pruned up to + // the latest block, so there's no thing to prune now. + let Some(from_tx_num) = from_tx_num else { return Ok(None) }; let to_tx_num = match provider.block_body_indices(to_block)? { Some(body) => body, @@ -200,7 +210,7 @@ impl Pruner { )? { Some(range) => range, None => { - trace!(target: "pruner", "No receipts to prune"); + trace!(target: "pruner", "No transaction lookup entries to prune"); return Ok(()) } }; @@ -248,6 +258,50 @@ impl Pruner { Ok(()) } + + /// Prune transaction senders up to the provided block, inclusive. + #[instrument(level = "trace", skip(self, provider), target = "pruner")] + fn prune_transaction_senders( + &self, + provider: &DatabaseProviderRW<'_, DB>, + to_block: BlockNumber, + prune_mode: PruneMode, + ) -> PrunerResult { + let range = match self.get_next_tx_num_range_from_checkpoint( + provider, + PrunePart::SenderRecovery, + to_block, + )? { + Some(range) => range, + None => { + trace!(target: "pruner", "No transaction senders to prune"); + return Ok(()) + } + }; + let total = range.clone().count(); + + let mut processed = 0; + provider.prune_table_in_batches::( + range, + self.batch_sizes.transaction_senders, + |entries| { + processed += entries; + trace!( + target: "pruner", + %entries, + progress = format!("{:.1}%", 100.0 * processed as f64 / total as f64), + "Pruned transaction senders" + ); + }, + )?; + + provider.save_prune_checkpoint( + PrunePart::SenderRecovery, + PruneCheckpoint { block_number: to_block, prune_mode }, + )?; + + Ok(()) + } } #[cfg(test)] @@ -409,4 +463,71 @@ mod tests { // ended last time test_prune(20); } + + #[test] + fn prune_transaction_senders() { + let tx = TestTransaction::default(); + let mut rng = generators::rng(); + + let blocks = random_block_range(&mut rng, 0..=100, H256::zero(), 0..10); + tx.insert_blocks(blocks.iter(), None).expect("insert blocks"); + + let mut transaction_senders = Vec::new(); + for block in &blocks { + for transaction in &block.body { + transaction_senders.push(( + transaction_senders.len() as u64, + transaction.recover_signer().expect("recover signer"), + )); + } + } + tx.insert_transaction_senders(transaction_senders).expect("insert transaction senders"); + + assert_eq!( + tx.table::().unwrap().len(), + blocks.iter().map(|block| block.body.len()).sum::() + ); + assert_eq!( + tx.table::().unwrap().len(), + tx.table::().unwrap().len() + ); + + let test_prune = |to_block: BlockNumber| { + let prune_mode = PruneMode::Before(to_block); + let pruner = Pruner::new( + tx.inner_raw(), + MAINNET.clone(), + 5, + 0, + PruneModes { sender_recovery: Some(prune_mode), ..Default::default() }, + BatchSizes { + // Less than total amount of blocks to prune to test the batching logic + transaction_senders: 10, + ..Default::default() + }, + ); + + let provider = tx.inner_rw(); + assert_matches!( + pruner.prune_transaction_senders(&provider, to_block, prune_mode), + Ok(()) + ); + provider.commit().expect("commit"); + + assert_eq!( + tx.table::().unwrap().len(), + blocks[to_block as usize + 1..].iter().map(|block| block.body.len()).sum::() + ); + assert_eq!( + tx.inner().get_prune_checkpoint(PrunePart::SenderRecovery).unwrap(), + Some(PruneCheckpoint { block_number: to_block, prune_mode }) + ); + }; + + // Pruning first time ever, no previous checkpoint is present + test_prune(10); + // Prune second time, previous checkpoint is present, should continue pruning from where + // ended last time + test_prune(20); + } } diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index 8872c8138f..9f72c69d6e 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -11,9 +11,11 @@ use reth_interfaces::consensus; use reth_primitives::{ keccak256, stage::{EntitiesCheckpoint, StageCheckpoint, StageId}, - TransactionSignedNoHash, TxNumber, H160, + PrunePart, TransactionSignedNoHash, TxNumber, H160, +}; +use reth_provider::{ + BlockReader, DatabaseProviderRW, HeaderProvider, ProviderError, PruneCheckpointReader, }; -use reth_provider::{BlockReader, DatabaseProviderRW, HeaderProvider, ProviderError}; use std::fmt::Debug; use thiserror::Error; use tokio::sync::mpsc; @@ -207,9 +209,20 @@ fn recover_sender( fn stage_checkpoint( provider: &DatabaseProviderRW<'_, &DB>, -) -> Result { +) -> Result { + let pruned_entries = provider + .get_prune_checkpoint(PrunePart::SenderRecovery)? + .map(|checkpoint| provider.block_body_indices(checkpoint.block_number)) + .transpose()? + .flatten() + // +1 is needed because TxNumber is 0-indexed + .map(|body| body.last_tx_num() + 1) + .unwrap_or_default(); Ok(EntitiesCheckpoint { - processed: provider.tx_ref().entries::()? as u64, + // If `TxSenders` table was pruned, we will have a number of entries in it not matching + // the actual number of processed transactions. To fix that, we add the number of pruned + // `TxSenders` entries. + processed: provider.tx_ref().entries::()? as u64 + pruned_entries, total: provider.tx_ref().entries::()? as u64, }) } @@ -239,9 +252,10 @@ mod tests { generators::{random_block, random_block_range}, }; use reth_primitives::{ - stage::StageUnitCheckpoint, BlockNumber, SealedBlock, TransactionSigned, H256, + stage::StageUnitCheckpoint, BlockNumber, PruneCheckpoint, PruneMode, SealedBlock, + TransactionSigned, H256, MAINNET, }; - use reth_provider::TransactionsProvider; + use reth_provider::{ProviderFactory, PruneCheckpointWriter, TransactionsProvider}; use super::*; use crate::test_utils::{ @@ -366,6 +380,58 @@ mod tests { assert!(runner.validate_execution(first_input, result.ok()).is_ok(), "validation failed"); } + #[test] + fn stage_checkpoint_pruned() { + let tx = TestTransaction::default(); + let mut rng = generators::rng(); + + let blocks = random_block_range(&mut rng, 0..=100, H256::zero(), 0..10); + tx.insert_blocks(blocks.iter(), None).expect("insert blocks"); + + let max_pruned_block = 30; + let max_processed_block = 70; + + let mut tx_senders = Vec::new(); + let mut tx_number = 0; + for block in &blocks[..=max_processed_block] { + for transaction in &block.body { + if block.number > max_pruned_block { + tx_senders + .push((tx_number, transaction.recover_signer().expect("recover signer"))); + } + tx_number += 1; + } + } + tx.insert_transaction_senders(tx_senders).expect("insert tx hash numbers"); + + let provider = tx.inner_rw(); + provider + .save_prune_checkpoint( + PrunePart::SenderRecovery, + PruneCheckpoint { + block_number: max_pruned_block as BlockNumber, + prune_mode: PruneMode::Full, + }, + ) + .expect("save stage checkpoint"); + provider.commit().expect("commit"); + + let db = tx.inner_raw(); + let factory = ProviderFactory::new(db.as_ref(), MAINNET.clone()); + let provider = factory.provider_rw().expect("provider rw"); + + assert_eq!( + stage_checkpoint(&provider).expect("stage checkpoint"), + EntitiesCheckpoint { + processed: blocks[..=max_processed_block] + .iter() + .map(|block| block.body.len() as u64) + .sum::(), + total: blocks.iter().map(|block| block.body.len() as u64).sum::() + } + ); + } + struct SenderRecoveryTestRunner { tx: TestTransaction, threshold: u64, diff --git a/crates/stages/src/test_utils/test_db.rs b/crates/stages/src/test_utils/test_db.rs index 425cf1d712..a4df2b207f 100644 --- a/crates/stages/src/test_utils/test_db.rs +++ b/crates/stages/src/test_utils/test_db.rs @@ -293,6 +293,18 @@ impl TestTransaction { }) } + pub fn insert_transaction_senders(&self, transaction_senders: I) -> Result<(), DbError> + where + I: IntoIterator, + { + self.commit(|tx| { + transaction_senders.into_iter().try_for_each(|(tx_num, sender)| { + // Insert into receipts table. + tx.put::(tx_num, sender) + }) + }) + } + /// Insert collection of ([Address], [Account]) into corresponding tables. pub fn insert_accounts_and_storages(&self, accounts: I) -> Result<(), DbError> where