diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index a509ede133..add26fe8fa 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -365,7 +365,7 @@ where transactions: I, transaction_count: usize, ) -> ( - mpsc::Receiver, I::Recovered>>, + mpsc::Receiver<(usize, WithTxEnv, I::Recovered>)>, mpsc::Receiver, I::Recovered>, I::Error>>, ) { let (prewarm_tx, prewarm_rx) = mpsc::sync_channel(transaction_count); @@ -383,14 +383,14 @@ where ); self.executor.spawn_blocking(move || { let (transactions, convert) = transactions.into_parts(); - for tx in transactions { + for (idx, tx) in transactions.into_iter().enumerate() { let tx = convert.convert(tx); let tx = tx.map(|tx| { let (tx_env, tx) = tx.into_parts(); WithTxEnv { tx_env, tx: Arc::new(tx) } }); if let Ok(tx) = &tx { - let _ = prewarm_tx.send(tx.clone()); + let _ = prewarm_tx.send((idx, tx.clone())); } let _ = execute_tx.send(tx); } @@ -402,13 +402,14 @@ where let (transactions, convert) = transactions.into_parts(); transactions .into_par_iter() - .map(|tx| { + .enumerate() + .map(|(idx, tx)| { let tx = convert.convert(tx); tx.map(|tx| { let (tx_env, tx) = tx.into_parts(); let tx = WithTxEnv { tx_env, tx: Arc::new(tx) }; - // Send to prewarming out of order — order doesn't matter there. - let _ = prewarm_tx.send(tx.clone()); + // Send to prewarming out of order with the original index. + let _ = prewarm_tx.send((idx, tx.clone())); tx }) }) @@ -431,7 +432,7 @@ where fn spawn_caching_with

( &self, env: ExecutionEnv, - transactions: mpsc::Receiver + Clone + Send + 'static>, + transactions: mpsc::Receiver<(usize, impl ExecutableTxFor + Clone + Send + 'static)>, provider_builder: StateProviderBuilder, to_multi_proof: Option>, bal: Option>, diff --git a/crates/engine/tree/src/tree/payload_processor/prewarm.rs b/crates/engine/tree/src/tree/payload_processor/prewarm.rs index c0a944a2e6..a432c1b3e8 100644 --- a/crates/engine/tree/src/tree/payload_processor/prewarm.rs +++ b/crates/engine/tree/src/tree/payload_processor/prewarm.rs @@ -49,8 +49,8 @@ use tracing::{debug, debug_span, instrument, trace, warn, Span}; /// Determines the prewarming mode: transaction-based, BAL-based, or skipped. #[derive(Debug)] pub enum PrewarmMode { - /// Prewarm by executing transactions from a stream. - Transactions(Receiver), + /// Prewarm by executing transactions from a stream, each paired with its block index. + Transactions(Receiver<(usize, Tx)>), /// Prewarm by prefetching slots from a Block Access List. BlockAccessList(Arc), /// Transaction prewarming is skipped (e.g. small blocks where the overhead exceeds the @@ -133,7 +133,7 @@ where /// subsequent transactions in the block. fn spawn_all( &self, - pending: mpsc::Receiver, + pending: mpsc::Receiver<(usize, Tx)>, actions_tx: Sender>, to_multi_proof: Option>, ) where @@ -161,8 +161,8 @@ where let tx_sender = ctx.clone().spawn_workers(workers_needed, &executor, to_multi_proof.clone(), done_tx.clone()); // Distribute transactions to workers - let mut tx_index = 0usize; - while let Ok(tx) = pending.recv() { + let mut tx_count = 0usize; + while let Ok((tx_index, tx)) = pending.recv() { // Stop distributing if termination was requested if ctx.terminate_execution.load(Ordering::Relaxed) { trace!( @@ -179,7 +179,7 @@ where // exit early when signaled. let _ = tx_sender.send(indexed_tx); - tx_index += 1; + tx_count += 1; } // Send withdrawal prefetch targets after all transactions have been distributed @@ -198,7 +198,7 @@ where while done_rx.recv().is_ok() {} let _ = actions_tx - .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: tx_index }); + .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: tx_count }); }); }