mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-30 01:28:21 -05:00
perf(stages): use regular rayon spawn (#1832)
This commit is contained in:
@@ -80,7 +80,7 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
|
||||
// Iterate over transactions in chunks
|
||||
info!(target: "sync::stages::sender_recovery", start_tx_index, end_tx_index, "Recovering senders");
|
||||
|
||||
// a channel to receive results from a rayon job
|
||||
// an _unordered_ channel to receive results from a rayon job
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
|
||||
// spawn recovery jobs onto the default rayon threadpool and send the result through the
|
||||
@@ -88,7 +88,10 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
|
||||
for entry in entries {
|
||||
let (tx_id, transaction) = entry?;
|
||||
let tx = tx.clone();
|
||||
rayon::spawn_fifo(move || {
|
||||
|
||||
// Spawn the sender recovery task onto the global rayon pool
|
||||
// This task will send the result through the channel after it recovered the sender.
|
||||
rayon::spawn(move || {
|
||||
let res = if let Some(signer) = transaction.recover_signer() {
|
||||
Ok((tx_id, signer))
|
||||
} else {
|
||||
@@ -100,6 +103,8 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
|
||||
}
|
||||
drop(tx);
|
||||
|
||||
// we need sorted results so we wrap the _unordered_ receiver stream into the sequential
|
||||
// stream which yields the next result (increasing transaction id)
|
||||
let mut recovered_senders =
|
||||
SequentialPairStream::new(start_tx_index, UnboundedReceiverStream::new(rx));
|
||||
|
||||
|
||||
Reference in New Issue
Block a user