diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index 6ec8f71cb3..0e590737e8 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -101,7 +101,7 @@ impl Stage for SenderRecoveryStage { for chunk_range in chunks { // An _unordered_ channel to receive results from a rayon job let (recovered_senders_tx, recovered_senders_rx) = mpsc::channel(); - channels.push(recovered_senders_rx); + channels.push((chunk_range.clone(), recovered_senders_rx)); let static_file_provider = provider.static_file_provider().clone(); @@ -109,10 +109,11 @@ impl Stage for SenderRecoveryStage { // This task will send the results through the channel after it has read the transaction // and calculated the sender. rayon::spawn(move || { + debug!(target: "sync::stages::sender_recovery", ?chunk_range, "Recovering senders batch"); let mut rlp_buf = Vec::with_capacity(128); let _ = static_file_provider.fetch_range_with_predicate( StaticFileSegment::Transactions, - chunk_range, + chunk_range.clone(), |cursor, number| { Ok(cursor .get_one::>(number.into())? @@ -124,11 +125,13 @@ impl Stage for SenderRecoveryStage { }, |_| true, ); + debug!(target: "sync::stages::sender_recovery", ?chunk_range, "Finished recovering senders batch"); }); } // Iterate over channels and append the sender in the order that they are received. - for channel in channels { + for (chunk_range, channel) in channels { + debug!(target: "sync::stages::sender_recovery", ?chunk_range, "Appending recovered senders to the database"); while let Ok(recovered) = channel.recv() { let (tx_id, sender) = match recovered { Ok(result) => result,