diff --git a/crates/stages/stages/src/stages/sender_recovery.rs b/crates/stages/stages/src/stages/sender_recovery.rs index a85b0bc60c..a4eda6394c 100644 --- a/crates/stages/stages/src/stages/sender_recovery.rs +++ b/crates/stages/stages/src/stages/sender_recovery.rs @@ -29,6 +29,9 @@ const BATCH_SIZE: usize = 100_000; /// Maximum number of senders to recover per rayon worker job. const WORKER_CHUNK_SIZE: usize = 100; +/// Type alias for a sender that transmits the result of sender recovery. +type RecoveryResultSender = mpsc::Sender>>; + /// The sender recovery stage iterates over existing transactions, /// recovers the transaction signer and stores them /// in [`TransactionSenders`][reth_db::tables::TransactionSenders] table. @@ -100,8 +103,10 @@ where .map(|start| start..std::cmp::min(start + BATCH_SIZE as u64, tx_range.end)) .collect::>>(); + let tx_batch_sender = setup_range_recovery(provider); + for range in batch { - recover_range(range, provider, &mut senders_cursor)?; + recover_range(range, provider, tx_batch_sender.clone(), &mut senders_cursor)?; } Ok(ExecOutput { @@ -136,15 +141,16 @@ where fn recover_range( tx_range: Range, provider: &Provider, + tx_batch_sender: mpsc::Sender, RecoveryResultSender)>>, senders_cursor: &mut CURSOR, ) -> Result<(), StageError> where Provider: DBProvider + HeaderProvider + StaticFileProviderFactory, CURSOR: DbCursorRW, { - debug!(target: "sync::stages::sender_recovery", ?tx_range, "Recovering senders batch"); + debug!(target: "sync::stages::sender_recovery", ?tx_range, "Sending batch for processing"); - // Preallocate channels + // Preallocate channels for each chunks in the batch let (chunks, receivers): (Vec<_>, Vec<_>) = tx_range .clone() .step_by(WORKER_CHUNK_SIZE) @@ -156,62 +162,9 @@ where }) .unzip(); - let static_file_provider = provider.static_file_provider(); - - // We do not use `tokio::task::spawn_blocking` because, during a shutdown, - // there will be a timeout grace period in which Tokio does not allow spawning - // additional blocking tasks. This would cause this function to return - // `SenderRecoveryStageError::RecoveredSendersMismatch` at the end. - // - // However, using `std::thread::spawn` allows us to utilize the timeout grace - // period to complete some work without throwing errors during the shutdown. - std::thread::spawn(move || { - for (chunk_range, recovered_senders_tx) in chunks { - // Read the raw value, and let the rayon worker to decompress & decode. - let chunk = match static_file_provider.fetch_range_with_predicate( - StaticFileSegment::Transactions, - chunk_range.clone(), - |cursor, number| { - Ok(cursor - .get_one::>>( - number.into(), - )? - .map(|tx| (number, tx))) - }, - |_| true, - ) { - Ok(chunk) => chunk, - Err(err) => { - // We exit early since we could not process this chunk. - let _ = recovered_senders_tx - .send(Err(Box::new(SenderRecoveryStageError::StageError(err.into())))); - break - } - }; - - // Spawn the task onto the global rayon pool - // This task will send the results through the channel after it has read the transaction - // and calculated the sender. - rayon::spawn(move || { - let mut rlp_buf = Vec::with_capacity(128); - for (number, tx) in chunk { - let res = tx - .value() - .map_err(|err| Box::new(SenderRecoveryStageError::StageError(err.into()))) - .and_then(|tx| recover_sender((number, tx), &mut rlp_buf)); - - let is_err = res.is_err(); - - let _ = recovered_senders_tx.send(res); - - // Finish early - if is_err { - break - } - } - }); - } - }); + if let Some(err) = tx_batch_sender.send(chunks).err() { + return Err(StageError::Fatal(err.into())); + } debug!(target: "sync::stages::sender_recovery", ?tx_range, "Appending recovered senders to the database"); @@ -235,6 +188,7 @@ where provider.sealed_header(block_number)?.ok_or_else(|| { ProviderError::HeaderNotFound(block_number.into()) })?; + Err(StageError::Block { block: Box::new(sealed_header), error: BlockErrorKind::Validation( @@ -269,10 +223,82 @@ where .into(), )); } - Ok(()) } +/// Spawns a thread to handle the recovery of transaction senders for +/// specified chunks of a given batch. It processes incoming ranges, fetching and recovering +/// transactions in parallel using global rayon pool +fn setup_range_recovery( + provider: &Provider, +) -> mpsc::Sender, RecoveryResultSender)>> +where + Provider: DBProvider + HeaderProvider + StaticFileProviderFactory, +{ + let (tx_sender, tx_receiver) = mpsc::channel::, RecoveryResultSender)>>(); + let static_file_provider = provider.static_file_provider(); + + // We do not use `tokio::task::spawn_blocking` because, during a shutdown, + // there will be a timeout grace period in which Tokio does not allow spawning + // additional blocking tasks. This would cause this function to return + // `SenderRecoveryStageError::RecoveredSendersMismatch` at the end. + // + // However, using `std::thread::spawn` allows us to utilize the timeout grace + // period to complete some work without throwing errors during the shutdown. + std::thread::spawn(move || { + while let Ok(chunks) = tx_receiver.recv() { + for (chunk_range, recovered_senders_tx) in chunks { + // Read the raw value, and let the rayon worker to decompress & decode. + let chunk = match static_file_provider.fetch_range_with_predicate( + StaticFileSegment::Transactions, + chunk_range.clone(), + |cursor, number| { + Ok(cursor + .get_one::>>( + number.into(), + )? + .map(|tx| (number, tx))) + }, + |_| true, + ) { + Ok(chunk) => chunk, + Err(err) => { + // We exit early since we could not process this chunk. + let _ = recovered_senders_tx + .send(Err(Box::new(SenderRecoveryStageError::StageError(err.into())))); + break + } + }; + + // Spawn the task onto the global rayon pool + // This task will send the results through the channel after it has read the + // transaction and calculated the sender. + rayon::spawn(move || { + let mut rlp_buf = Vec::with_capacity(128); + for (number, tx) in chunk { + let res = tx + .value() + .map_err(|err| { + Box::new(SenderRecoveryStageError::StageError(err.into())) + }) + .and_then(|tx| recover_sender((number, tx), &mut rlp_buf)); + + let is_err = res.is_err(); + + let _ = recovered_senders_tx.send(res); + + // Finish early + if is_err { + break + } + } + }); + } + } + }); + tx_sender +} + #[inline] fn recover_sender( (tx_id, tx): (TxNumber, TransactionSignedNoHash),