mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
fix(engine): send correct transaction index in prewarm task (#22223)
Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
committed by
GitHub
parent
9265e8e46c
commit
f0c4be108b
@@ -365,7 +365,7 @@ where
|
||||
transactions: I,
|
||||
transaction_count: usize,
|
||||
) -> (
|
||||
mpsc::Receiver<WithTxEnv<TxEnvFor<Evm>, I::Recovered>>,
|
||||
mpsc::Receiver<(usize, WithTxEnv<TxEnvFor<Evm>, I::Recovered>)>,
|
||||
mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Recovered>, I::Error>>,
|
||||
) {
|
||||
let (prewarm_tx, prewarm_rx) = mpsc::sync_channel(transaction_count);
|
||||
@@ -383,14 +383,14 @@ where
|
||||
);
|
||||
self.executor.spawn_blocking(move || {
|
||||
let (transactions, convert) = transactions.into_parts();
|
||||
for tx in transactions {
|
||||
for (idx, tx) in transactions.into_iter().enumerate() {
|
||||
let tx = convert.convert(tx);
|
||||
let tx = tx.map(|tx| {
|
||||
let (tx_env, tx) = tx.into_parts();
|
||||
WithTxEnv { tx_env, tx: Arc::new(tx) }
|
||||
});
|
||||
if let Ok(tx) = &tx {
|
||||
let _ = prewarm_tx.send(tx.clone());
|
||||
let _ = prewarm_tx.send((idx, tx.clone()));
|
||||
}
|
||||
let _ = execute_tx.send(tx);
|
||||
}
|
||||
@@ -402,13 +402,14 @@ where
|
||||
let (transactions, convert) = transactions.into_parts();
|
||||
transactions
|
||||
.into_par_iter()
|
||||
.map(|tx| {
|
||||
.enumerate()
|
||||
.map(|(idx, tx)| {
|
||||
let tx = convert.convert(tx);
|
||||
tx.map(|tx| {
|
||||
let (tx_env, tx) = tx.into_parts();
|
||||
let tx = WithTxEnv { tx_env, tx: Arc::new(tx) };
|
||||
// Send to prewarming out of order — order doesn't matter there.
|
||||
let _ = prewarm_tx.send(tx.clone());
|
||||
// Send to prewarming out of order with the original index.
|
||||
let _ = prewarm_tx.send((idx, tx.clone()));
|
||||
tx
|
||||
})
|
||||
})
|
||||
@@ -431,7 +432,7 @@ where
|
||||
fn spawn_caching_with<P>(
|
||||
&self,
|
||||
env: ExecutionEnv<Evm>,
|
||||
transactions: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
|
||||
transactions: mpsc::Receiver<(usize, impl ExecutableTxFor<Evm> + Clone + Send + 'static)>,
|
||||
provider_builder: StateProviderBuilder<N, P>,
|
||||
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
|
||||
bal: Option<Arc<BlockAccessList>>,
|
||||
|
||||
@@ -49,8 +49,8 @@ use tracing::{debug, debug_span, instrument, trace, warn, Span};
|
||||
/// Determines the prewarming mode: transaction-based, BAL-based, or skipped.
|
||||
#[derive(Debug)]
|
||||
pub enum PrewarmMode<Tx> {
|
||||
/// Prewarm by executing transactions from a stream.
|
||||
Transactions(Receiver<Tx>),
|
||||
/// Prewarm by executing transactions from a stream, each paired with its block index.
|
||||
Transactions(Receiver<(usize, Tx)>),
|
||||
/// Prewarm by prefetching slots from a Block Access List.
|
||||
BlockAccessList(Arc<BlockAccessList>),
|
||||
/// Transaction prewarming is skipped (e.g. small blocks where the overhead exceeds the
|
||||
@@ -133,7 +133,7 @@ where
|
||||
/// subsequent transactions in the block.
|
||||
fn spawn_all<Tx>(
|
||||
&self,
|
||||
pending: mpsc::Receiver<Tx>,
|
||||
pending: mpsc::Receiver<(usize, Tx)>,
|
||||
actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
|
||||
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
|
||||
) where
|
||||
@@ -161,8 +161,8 @@ where
|
||||
let tx_sender = ctx.clone().spawn_workers(workers_needed, &executor, to_multi_proof.clone(), done_tx.clone());
|
||||
|
||||
// Distribute transactions to workers
|
||||
let mut tx_index = 0usize;
|
||||
while let Ok(tx) = pending.recv() {
|
||||
let mut tx_count = 0usize;
|
||||
while let Ok((tx_index, tx)) = pending.recv() {
|
||||
// Stop distributing if termination was requested
|
||||
if ctx.terminate_execution.load(Ordering::Relaxed) {
|
||||
trace!(
|
||||
@@ -179,7 +179,7 @@ where
|
||||
// exit early when signaled.
|
||||
let _ = tx_sender.send(indexed_tx);
|
||||
|
||||
tx_index += 1;
|
||||
tx_count += 1;
|
||||
}
|
||||
|
||||
// Send withdrawal prefetch targets after all transactions have been distributed
|
||||
@@ -198,7 +198,7 @@ where
|
||||
while done_rx.recv().is_ok() {}
|
||||
|
||||
let _ = actions_tx
|
||||
.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: tx_index });
|
||||
.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: tx_count });
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user