From 74bbe5afa86ce1917a75e8f23cbee78d22457eff Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 26 Jul 2023 17:44:46 +0100 Subject: [PATCH] fix(stages): transaction lookup stage checkpoint calculation (#3909) --- crates/stages/src/stages/tx_lookup.rs | 79 +++++++++++++++++++++++++-- 1 file changed, 73 insertions(+), 6 deletions(-) diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index 09e0e6d674..211266d45d 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -11,9 +11,9 @@ use reth_db::{ use reth_primitives::{ keccak256, stage::{EntitiesCheckpoint, StageCheckpoint, StageId}, - TransactionSignedNoHash, TxNumber, H256, + PrunePart, TransactionSignedNoHash, TxNumber, H256, }; -use reth_provider::DatabaseProviderRW; +use reth_provider::{BlockReader, DatabaseProviderRW, PruneCheckpointReader}; use tokio::sync::mpsc; use tracing::*; @@ -183,9 +183,20 @@ fn calculate_hash( fn stage_checkpoint( provider: &DatabaseProviderRW<'_, &DB>, -) -> Result { +) -> Result { + let pruned_entries = provider + .get_prune_checkpoint(PrunePart::TransactionLookup)? + .map(|checkpoint| provider.block_body_indices(checkpoint.block_number)) + .transpose()? + .flatten() + // +1 is needed because TxNumber is 0-indexed + .map(|body| body.last_tx_num() + 1) + .unwrap_or_default(); Ok(EntitiesCheckpoint { - processed: provider.tx_ref().entries::()? as u64, + // If `TxHashNumber` table was pruned, we will have a number of entries in it not matching + // the actual number of processed transactions. To fix that, we add the number of pruned + // `TxHashNumber` entries. + processed: provider.tx_ref().entries::()? as u64 + pruned_entries, total: provider.tx_ref().entries::()? as u64, }) } @@ -202,8 +213,13 @@ mod tests { generators, generators::{random_block, random_block_range}, }; - use reth_primitives::{stage::StageUnitCheckpoint, BlockNumber, SealedBlock, H256}; - use reth_provider::{BlockReader, ProviderError, TransactionsProvider}; + use reth_primitives::{ + stage::StageUnitCheckpoint, BlockNumber, PruneCheckpoint, PruneMode, SealedBlock, H256, + MAINNET, + }; + use reth_provider::{ + BlockReader, ProviderError, ProviderFactory, PruneCheckpointWriter, TransactionsProvider, + }; // Implement stage test suite. stage_test_suite_ext!(TransactionLookupTestRunner, transaction_lookup); @@ -321,6 +337,57 @@ mod tests { assert!(runner.validate_execution(first_input, result.ok()).is_ok(), "validation failed"); } + #[test] + fn stage_checkpoint_pruned() { + let tx = TestTransaction::default(); + let mut rng = generators::rng(); + + let blocks = random_block_range(&mut rng, 0..=100, H256::zero(), 0..10); + tx.insert_blocks(blocks.iter(), None).expect("insert blocks"); + + let max_pruned_block = 30; + let max_processed_block = 70; + + let mut tx_hash_numbers = Vec::new(); + let mut tx_hash_number = 0; + for block in &blocks[..=max_processed_block] { + for transaction in &block.body { + if block.number > max_pruned_block { + tx_hash_numbers.push((transaction.hash, tx_hash_number)); + } + tx_hash_number += 1; + } + } + tx.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers"); + + let provider = tx.inner_rw(); + provider + .save_prune_checkpoint( + PrunePart::TransactionLookup, + PruneCheckpoint { + block_number: max_pruned_block as BlockNumber, + prune_mode: PruneMode::Full, + }, + ) + .expect("save stage checkpoint"); + provider.commit().expect("commit"); + + let db = tx.inner_raw(); + let factory = ProviderFactory::new(db.as_ref(), MAINNET.clone()); + let provider = factory.provider_rw().expect("provider rw"); + + assert_eq!( + stage_checkpoint(&provider).expect("stage checkpoint"), + EntitiesCheckpoint { + processed: blocks[..=max_processed_block] + .iter() + .map(|block| block.body.len() as u64) + .sum::(), + total: blocks.iter().map(|block| block.body.len() as u64).sum::() + } + ); + } + struct TransactionLookupTestRunner { tx: TestTransaction, threshold: u64,