diff --git a/Cargo.lock b/Cargo.lock index b2c14b5d47..6d2141a46a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5588,6 +5588,7 @@ version = "0.1.0-alpha.4" dependencies = [ "assert_matches", "itertools 0.10.5", + "rayon", "reth-db", "reth-interfaces", "reth-primitives", diff --git a/crates/prune/Cargo.toml b/crates/prune/Cargo.toml index 432a0ecfdd..8db44ce6b3 100644 --- a/crates/prune/Cargo.toml +++ b/crates/prune/Cargo.toml @@ -21,6 +21,7 @@ reth-interfaces = { workspace = true } tracing = { workspace = true } thiserror = { workspace = true } itertools = "0.10" +rayon = "1.6.0" [dev-dependencies] # reth diff --git a/crates/prune/src/error.rs b/crates/prune/src/error.rs index a38e3d6e5d..fdc0af4484 100644 --- a/crates/prune/src/error.rs +++ b/crates/prune/src/error.rs @@ -4,6 +4,9 @@ use thiserror::Error; #[derive(Error, Debug)] pub enum PrunerError { + #[error("Inconsistent data: {0}")] + InconsistentData(&'static str), + #[error("An interface error occurred.")] Interface(#[from] reth_interfaces::Error), diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index 400d0bda22..de020dcd27 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -1,10 +1,16 @@ //! Support for pruning. use crate::PrunerError; +use rayon::prelude::*; use reth_db::{database::Database, tables}; -use reth_primitives::{BlockNumber, ChainSpec, PruneCheckpoint, PruneMode, PruneModes, PrunePart}; -use reth_provider::{BlockReader, DatabaseProviderRW, ProviderFactory, PruneCheckpointWriter}; -use std::sync::Arc; +use reth_primitives::{ + BlockNumber, ChainSpec, PruneCheckpoint, PruneMode, PruneModes, PrunePart, TxNumber, +}; +use reth_provider::{ + BlockReader, DatabaseProviderRW, ProviderFactory, PruneCheckpointReader, PruneCheckpointWriter, + TransactionsProvider, +}; +use std::{ops::RangeInclusive, sync::Arc}; use tracing::{debug, instrument, trace}; /// Result of [Pruner::run] execution @@ -15,11 +21,12 @@ pub type PrunerWithResult = (Pruner, PrunerResult); pub struct BatchSizes { receipts: usize, + transaction_lookup: usize, } impl Default for BatchSizes { fn default() -> Self { - Self { receipts: 10000 } + Self { receipts: 10000, transaction_lookup: 10000 } } } @@ -70,6 +77,12 @@ impl Pruner { self.prune_receipts(&provider, to_block, prune_mode)?; } + if let Some((to_block, prune_mode)) = + self.modes.prune_target_block_transaction_lookup(tip_block_number) + { + self.prune_transaction_lookup(&provider, to_block, prune_mode)?; + } + provider.commit()?; self.last_pruned_block_number = Some(tip_block_number); @@ -97,6 +110,37 @@ impl Pruner { } } + /// Get next inclusive tx number range to prune according to the checkpoint and `to_block` block + /// number. + /// + /// To get the range start: + /// 1. If checkpoint exists, get next block body and return its first tx number. + /// 2. If checkpoint doesn't exist, return 0. + /// + /// To get the range end: get last tx number for the provided `to_block`. + fn get_next_tx_num_range_from_checkpoint( + &self, + provider: &DatabaseProviderRW<'_, DB>, + prune_part: PrunePart, + to_block: BlockNumber, + ) -> reth_interfaces::Result>> { + let from_tx_num = provider + .get_prune_checkpoint(prune_part)? + .map(|checkpoint| provider.block_body_indices(checkpoint.block_number + 1)) + .transpose()? + .flatten() + .map(|body| body.first_tx_num) + .unwrap_or_default(); + + let to_tx_num = match provider.block_body_indices(to_block)? { + Some(body) => body, + None => return Ok(None), + } + .last_tx_num(); + + Ok(Some(from_tx_num..=to_tx_num)) + } + /// Prune receipts up to the provided block, inclusive. #[instrument(level = "trace", skip(self, provider), target = "pruner")] fn prune_receipts( @@ -105,16 +149,20 @@ impl Pruner { to_block: BlockNumber, prune_mode: PruneMode, ) -> PrunerResult { - let to_block_body = match provider.block_body_indices(to_block)? { - Some(body) => body, + let range = match self.get_next_tx_num_range_from_checkpoint( + provider, + PrunePart::Receipts, + to_block, + )? { + Some(range) => range, None => { trace!(target: "pruner", "No receipts to prune"); return Ok(()) } }; - provider.prune_table_in_batches::( - ..=to_block_body.last_tx_num(), + provider.prune_table_in_batches::( + range, self.batch_sizes.receipts, |receipts| { trace!( @@ -132,6 +180,71 @@ impl Pruner { Ok(()) } + + /// Prune transaction lookup entries up to the provided block, inclusive. + #[instrument(level = "trace", skip(self, provider), target = "pruner")] + fn prune_transaction_lookup( + &self, + provider: &DatabaseProviderRW<'_, DB>, + to_block: BlockNumber, + prune_mode: PruneMode, + ) -> PrunerResult { + let range = match self.get_next_tx_num_range_from_checkpoint( + provider, + PrunePart::TransactionLookup, + to_block, + )? { + Some(range) => range, + None => { + trace!(target: "pruner", "No receipts to prune"); + return Ok(()) + } + }; + let last_tx_num = *range.end(); + + for i in range.step_by(self.batch_sizes.transaction_lookup) { + // The `min` ensures that the transaction range doesn't exceed the last transaction + // number. `last_tx_num + 1` is used to include the last transaction in the range. + let tx_range = i..(i + self.batch_sizes.transaction_lookup as u64).min(last_tx_num + 1); + + // Retrieve transactions in the range and calculate their hashes in parallel + let mut hashes = provider + .transactions_by_tx_range(tx_range.clone())? + .into_par_iter() + .map(|transaction| transaction.hash()) + .collect::>(); + + // Number of transactions retrieved from the database should match the tx range count + let tx_count = tx_range.clone().count(); + if hashes.len() != tx_count { + return Err(PrunerError::InconsistentData( + "Unexpected number of transaction hashes retrieved by transaction number range", + )) + } + + // Pre-sort hashes to prune them in order + hashes.sort(); + + provider.prune_table_in_batches::( + hashes, + self.batch_sizes.transaction_lookup, + |entries| { + trace!( + target: "pruner", + %entries, + "Pruned transaction lookup" + ); + }, + )?; + } + + provider.save_prune_checkpoint( + PrunePart::TransactionLookup, + PruneCheckpoint { block_number: to_block, prune_mode }, + )?; + + Ok(()) + } } #[cfg(test)] @@ -143,7 +256,9 @@ mod tests { generators, generators::{random_block_range, random_receipt}, }; - use reth_primitives::{PruneCheckpoint, PruneMode, PruneModes, PrunePart, H256, MAINNET}; + use reth_primitives::{ + BlockNumber, PruneCheckpoint, PruneMode, PruneModes, PrunePart, H256, MAINNET, + }; use reth_provider::PruneCheckpointReader; use reth_stages::test_utils::TestTransaction; @@ -192,34 +307,103 @@ mod tests { tx.table::().unwrap().len() ); - let prune_to_block = 10; - let prune_mode = PruneMode::Before(prune_to_block); - let pruner = Pruner::new( - tx.inner_raw(), - MAINNET.clone(), - 5, - 0, - PruneModes { receipts: Some(prune_mode), ..Default::default() }, - BatchSizes { - // Less than total amount of blocks to prune to test the batching logic - receipts: 10, - }, - ); + let test_prune = |to_block: BlockNumber| { + let prune_mode = PruneMode::Before(to_block); + let pruner = Pruner::new( + tx.inner_raw(), + MAINNET.clone(), + 5, + 0, + PruneModes { receipts: Some(prune_mode), ..Default::default() }, + BatchSizes { + // Less than total amount of blocks to prune to test the batching logic + receipts: 10, + ..Default::default() + }, + ); - let provider = tx.inner_rw(); - assert_matches!(pruner.prune_receipts(&provider, prune_to_block, prune_mode), Ok(())); - provider.commit().expect("commit"); + let provider = tx.inner_rw(); + assert_matches!(pruner.prune_receipts(&provider, to_block, prune_mode), Ok(())); + provider.commit().expect("commit"); + + assert_eq!( + tx.table::().unwrap().len(), + blocks[to_block as usize + 1..].iter().map(|block| block.body.len()).sum::() + ); + assert_eq!( + tx.inner().get_prune_checkpoint(PrunePart::Receipts).unwrap(), + Some(PruneCheckpoint { block_number: to_block, prune_mode }) + ); + }; + + // Pruning first time ever, no previous checkpoint is present + test_prune(10); + // Prune second time, previous checkpoint is present, should continue pruning from where + // ended last time + test_prune(20); + } + + #[test] + fn prune_transaction_lookup() { + let tx = TestTransaction::default(); + let mut rng = generators::rng(); + + let blocks = random_block_range(&mut rng, 0..=100, H256::zero(), 0..10); + tx.insert_blocks(blocks.iter(), None).expect("insert blocks"); + + let mut tx_hash_numbers = Vec::new(); + for block in &blocks { + for transaction in &block.body { + tx_hash_numbers.push((transaction.hash, tx_hash_numbers.len() as u64)); + } + } + tx.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers"); assert_eq!( - tx.table::().unwrap().len(), - blocks[prune_to_block as usize + 1..] - .iter() - .map(|block| block.body.len()) - .sum::() + tx.table::().unwrap().len(), + blocks.iter().map(|block| block.body.len()).sum::() ); assert_eq!( - tx.inner().get_prune_checkpoint(PrunePart::Receipts).unwrap(), - Some(PruneCheckpoint { block_number: prune_to_block, prune_mode }) + tx.table::().unwrap().len(), + tx.table::().unwrap().len() ); + + let test_prune = |to_block: BlockNumber| { + let prune_mode = PruneMode::Before(to_block); + let pruner = Pruner::new( + tx.inner_raw(), + MAINNET.clone(), + 5, + 0, + PruneModes { transaction_lookup: Some(prune_mode), ..Default::default() }, + BatchSizes { + // Less than total amount of blocks to prune to test the batching logic + transaction_lookup: 10, + ..Default::default() + }, + ); + + let provider = tx.inner_rw(); + assert_matches!( + pruner.prune_transaction_lookup(&provider, to_block, prune_mode), + Ok(()) + ); + provider.commit().expect("commit"); + + assert_eq!( + tx.table::().unwrap().len(), + blocks[to_block as usize + 1..].iter().map(|block| block.body.len()).sum::() + ); + assert_eq!( + tx.inner().get_prune_checkpoint(PrunePart::TransactionLookup).unwrap(), + Some(PruneCheckpoint { block_number: to_block, prune_mode }) + ); + }; + + // Pruning first time ever, no previous checkpoint is present + test_prune(10); + // Prune second time, previous checkpoint is present, should continue pruning from where + // ended last time + test_prune(20); } } diff --git a/crates/stages/src/test_utils/test_db.rs b/crates/stages/src/test_utils/test_db.rs index 4efb8debff..425cf1d712 100644 --- a/crates/stages/src/test_utils/test_db.rs +++ b/crates/stages/src/test_utils/test_db.rs @@ -11,7 +11,7 @@ use reth_db::{ }; use reth_primitives::{ keccak256, Account, Address, BlockNumber, Receipt, SealedBlock, SealedHeader, StorageEntry, - TxNumber, H256, MAINNET, U256, + TxHash, TxNumber, H256, MAINNET, U256, }; use reth_provider::{DatabaseProviderRO, DatabaseProviderRW, ProviderFactory}; use std::{ @@ -268,6 +268,18 @@ impl TestTransaction { }) } + pub fn insert_tx_hash_numbers(&self, tx_hash_numbers: I) -> Result<(), DbError> + where + I: IntoIterator, + { + self.commit(|tx| { + tx_hash_numbers.into_iter().try_for_each(|(tx_hash, tx_num)| { + // Insert into tx hash numbers table. + tx.put::(tx_hash, tx_num) + }) + }) + } + /// Insert collection of ([TxNumber], [Receipt]) into the corresponding table. pub fn insert_receipts(&self, receipts: I) -> Result<(), DbError> where diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 339807580e..1f871cfa29 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -619,40 +619,39 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { Ok(()) } - /// Prune the table for the specified key range. + /// Prune the table for the specified pre-sorted key iterator. /// Returns number of rows pruned. pub fn prune_table( &self, - range: impl RangeBounds, + keys: impl IntoIterator, ) -> std::result::Result where T: Table, K: Key, { - self.prune_table_in_batches::(range, usize::MAX, |_| {}) + self.prune_table_in_batches::(keys, usize::MAX, |_| {}) } - /// Prune the table for the specified key range calling `chunk_callback` after every - /// `batch_size` pruned rows. + /// Prune the table for the specified pre-sorted key iterator, calling `chunk_callback` after + /// every `batch_size` pruned rows. /// /// Returns number of rows pruned. - pub fn prune_table_in_batches( + pub fn prune_table_in_batches( &self, - range: impl RangeBounds, + keys: impl IntoIterator, batch_size: usize, - batch_callback: F, + batch_callback: impl Fn(usize), ) -> std::result::Result where T: Table, K: Key, - F: Fn(usize), { let mut cursor = self.tx.cursor_write::()?; - let mut walker = cursor.walk_range(range)?; let mut deleted = 0; - while let Some(Ok(_)) = walker.next() { - walker.delete_current()?; + for key in keys { + cursor.seek_exact(key)?; + cursor.delete_current()?; deleted += 1; if deleted % batch_size == 0 {