diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index 88682379b9..dc0e39aa97 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -3,6 +3,7 @@ use crate::{ StageError, StageId, UnwindInput, UnwindOutput, }; use futures_util::StreamExt; +use itertools::Itertools; use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, database::Database, @@ -75,36 +76,49 @@ impl Stage for SenderRecoveryStage { // Acquire the cursor over the transactions let mut tx_cursor = tx.cursor_read::()?; // Walk the transactions from start to end index (inclusive) - let entries = tx_cursor.walk_range(start_tx_index..=end_tx_index)?; + let tx_walker = tx_cursor.walk_range(start_tx_index..=end_tx_index)?; // Iterate over transactions in chunks info!(target: "sync::stages::sender_recovery", start_tx_index, end_tx_index, "Recovering senders"); - // an _unordered_ channel to receive results from a rayon job + // An _unordered_ channel to receive results from a rayon job let (tx, rx) = mpsc::unbounded_channel(); - // spawn recovery jobs onto the default rayon threadpool and send the result through the - // channel - for entry in entries { - let (tx_id, transaction) = entry?; + // Spawn recovery jobs onto the default rayon threadpool and send the result through the + // channel. + // + // We try to evenly divide the transactions to recover across all threads in the threadpool. + // Chunks are submitted instead of individual transactions to reduce the overhead of work + // stealing in the threadpool workers. + for chunk in + &tx_walker.chunks(self.commit_threshold as usize / rayon::current_num_threads()) + { let tx = tx.clone(); + // Note: Unfortunate side-effect of how chunk is designed in itertools (it is not Send) + let mut chunk: Vec<_> = chunk.collect(); // Spawn the sender recovery task onto the global rayon pool - // This task will send the result through the channel after it recovered the sender. + // This task will send the results through the channel after it recovered the senders. rayon::spawn(move || { - let res = if let Some(signer) = transaction.recover_signer() { - Ok((tx_id, signer)) - } else { - Err(StageError::from(SenderRecoveryStageError::SenderRecovery { tx: tx_id })) - }; - // send the result back - let _ = tx.send(res); + chunk + .drain(..) + .map(|entry| { + let (tx_id, transaction) = entry?; + let sender = transaction.recover_signer().ok_or(StageError::from( + SenderRecoveryStageError::SenderRecovery { tx: tx_id }, + ))?; + + Ok((tx_id, sender)) + }) + .for_each(|result: Result<_, StageError>| { + let _ = tx.send(result); + }); }); } drop(tx); - // we need sorted results so we wrap the _unordered_ receiver stream into the sequential - // stream which yields the next result (increasing transaction id) + // We need sorted results, so we wrap the _unordered_ receiver stream into a sequential + // stream, which yields the results by ascending transaction ID. let mut recovered_senders = SequentialPairStream::new(start_tx_index, UnboundedReceiverStream::new(rx));