From 99a314c59bbd94a34a285369da95fb5604883c65 Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Wed, 7 Jun 2023 16:44:45 +0300 Subject: [PATCH] feat(stages): normalized tx lookup commits (#3022) --- crates/stages/src/stages/tx_lookup.rs | 102 ++++++++++++-------------- 1 file changed, 46 insertions(+), 56 deletions(-) diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index 2471000127..ac5aa780f8 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -11,11 +11,10 @@ use reth_db::{ use reth_primitives::{ rpc_utils::keccak256, stage::{EntitiesCheckpoint, StageCheckpoint, StageId}, - BlockNumber, TransactionSignedNoHash, TxNumber, H256, + TransactionSignedNoHash, TxNumber, H256, }; use reth_provider::Transaction; use std::ops::Deref; -use thiserror::Error; use tokio::sync::mpsc; use tracing::*; @@ -26,13 +25,13 @@ use tracing::*; /// [`tables::TxHashNumber`] This is used for looking up changesets via the transaction hash. #[derive(Debug, Clone)] pub struct TransactionLookupStage { - /// The number of blocks to commit at once + /// The number of lookup entries to commit at once commit_threshold: u64, } impl Default for TransactionLookupStage { fn default() -> Self { - Self { commit_threshold: 50_000 } + Self { commit_threshold: 100_000 } } } @@ -56,27 +55,18 @@ impl Stage for TransactionLookupStage { tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { - let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold); - if range.is_empty() { - return Ok(ExecOutput::done(*range.end())) + if input.target_reached() { + return Ok(ExecOutput::done(input.checkpoint().block_number)) } - let (start_block, end_block) = range.into_inner(); - debug!(target: "sync::stages::transaction_lookup", start_block, end_block, "Commencing sync"); + let (tx_range, block_range, is_final_range) = + input.next_block_range_with_transaction_threshold(tx, self.commit_threshold)?; + let end_block = *block_range.end(); - let mut block_meta_cursor = tx.cursor_read::()?; - - let (_, first_block) = block_meta_cursor.seek_exact(start_block)?.ok_or( - StageError::from(TransactionLookupStageError::TransactionLookup { block: start_block }), - )?; - - let (_, last_block) = block_meta_cursor.seek_exact(end_block)?.ok_or(StageError::from( - TransactionLookupStageError::TransactionLookup { block: end_block }, - ))?; + debug!(target: "sync::stages::transaction_lookup", ?tx_range, "Updating transaction lookup"); let mut tx_cursor = tx.cursor_read::()?; - let tx_walker = - tx_cursor.walk_range(first_block.first_tx_num()..=last_block.last_tx_num())?; + let tx_walker = tx_cursor.walk_range(tx_range)?; let chunk_size = 100_000 / rayon::current_num_threads(); let mut channels = Vec::with_capacity(chunk_size); @@ -192,18 +182,6 @@ impl Stage for TransactionLookupStage { } } -#[derive(Error, Debug)] -enum TransactionLookupStageError { - #[error("Transaction lookup failed to find block {block}.")] - TransactionLookup { block: BlockNumber }, -} - -impl From for StageError { - fn from(error: TransactionLookupStageError) -> Self { - StageError::Fatal(Box::new(error)) - } -} - fn stage_checkpoint( tx: &Transaction<'_, DB>, ) -> Result { @@ -280,40 +258,52 @@ mod tests { }; // Seed only once with full input range - runner.seed_execution(first_input).expect("failed to seed execution"); + let seed = random_block_range(stage_progress + 1..=previous_stage, H256::zero(), 0..4); // set tx count range high enough to hit the threshold + runner.tx.insert_blocks(seed.iter(), None).expect("failed to seed execution"); + + let total_txs = runner.tx.table::().unwrap().len() as u64; // Execute first time let result = runner.execute(first_input).await.unwrap(); - let expected_progress = stage_progress + threshold; - assert_matches!( - result, - Ok(ExecOutput { checkpoint: StageCheckpoint { - block_number, - stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint { - processed, - total - })) - }, done: false }) if block_number == expected_progress && - processed == runner.tx.table::().unwrap().len() as u64 && - total == runner.tx.table::().unwrap().len() as u64 + let mut tx_count = 0; + let expected_progress = seed + .iter() + .find(|x| { + tx_count += x.body.len(); + tx_count as u64 > threshold + }) + .map(|x| x.number) + .unwrap_or(previous_stage); + assert_matches!(result, Ok(_)); + assert_eq!( + result.unwrap(), + ExecOutput { + checkpoint: StageCheckpoint::new(expected_progress).with_entities_stage_checkpoint( + EntitiesCheckpoint { + processed: runner.tx.table::().unwrap().len() as u64, + total: total_txs + } + ), + done: false + } ); - // Execute second time + // Execute second time to completion + runner.set_threshold(u64::MAX); let second_input = ExecInput { previous_stage: Some((PREV_STAGE_ID, previous_stage)), checkpoint: Some(StageCheckpoint::new(expected_progress)), }; let result = runner.execute(second_input).await.unwrap(); - assert_matches!( - result, - Ok(ExecOutput {checkpoint: StageCheckpoint { - block_number, - stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint { - processed, - total - })) - }, done: true }) if block_number == previous_stage && processed == total && - total == runner.tx.table::().unwrap().len() as u64 + assert_matches!(result, Ok(_)); + assert_eq!( + result.as_ref().unwrap(), + &ExecOutput { + checkpoint: StageCheckpoint::new(previous_stage).with_entities_stage_checkpoint( + EntitiesCheckpoint { processed: total_txs, total: total_txs } + ), + done: true + } ); assert!(runner.validate_execution(first_input, result.ok()).is_ok(), "validation failed");