diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index d96a78c123..88682379b9 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -80,7 +80,7 @@ impl Stage for SenderRecoveryStage { // Iterate over transactions in chunks info!(target: "sync::stages::sender_recovery", start_tx_index, end_tx_index, "Recovering senders"); - // a channel to receive results from a rayon job + // an _unordered_ channel to receive results from a rayon job let (tx, rx) = mpsc::unbounded_channel(); // spawn recovery jobs onto the default rayon threadpool and send the result through the @@ -88,7 +88,10 @@ impl Stage for SenderRecoveryStage { for entry in entries { let (tx_id, transaction) = entry?; let tx = tx.clone(); - rayon::spawn_fifo(move || { + + // Spawn the sender recovery task onto the global rayon pool + // This task will send the result through the channel after it recovered the sender. + rayon::spawn(move || { let res = if let Some(signer) = transaction.recover_signer() { Ok((tx_id, signer)) } else { @@ -100,6 +103,8 @@ impl Stage for SenderRecoveryStage { } drop(tx); + // we need sorted results so we wrap the _unordered_ receiver stream into the sequential + // stream which yields the next result (increasing transaction id) let mut recovered_senders = SequentialPairStream::new(start_tx_index, UnboundedReceiverStream::new(rx));