perf(engine): prefetch first txs sequentially to avoid rayon scheduling stall (#22305)

Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com>
This commit is contained in:
Georgios Konstantopoulos
2026-02-18 21:53:52 -08:00
committed by GitHub
parent 8529da976f
commit 8970f82aaf

View File

@@ -352,14 +352,23 @@ where
/// produce fewer state changes and most workers would be idle overhead.
const SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD: usize = 30;
/// Transaction count threshold below which sequential signature recovery is used.
/// Transaction count threshold below which sequential conversion is used.
///
/// For blocks with fewer than this many transactions, the rayon parallel iterator overhead
/// (work-stealing setup, channel-based reorder) exceeds the cost of sequential ECDSA
/// recovery. Inspired by Nethermind's `RecoverSignature` which uses sequential `foreach`
/// for small blocks.
/// (work-stealing setup, channel-based reorder) exceeds the cost of sequential conversion.
/// Inspired by Nethermind's `RecoverSignature` which uses sequential `foreach` for small
/// blocks.
const SMALL_BLOCK_TX_THRESHOLD: usize = 30;
/// Number of leading transactions to convert sequentially before entering the rayon
/// parallel path.
///
/// Rayon's work-stealing does not guarantee that index 0 is processed first, so the
/// ordered consumer can block for up to ~1ms waiting for the first slot. By converting
/// a small head sequentially and sending it immediately, execution can start without
/// waiting for rayon scheduling.
const PARALLEL_PREFETCH_COUNT: usize = 4;
/// Returns the multiproof chunk size adapted to the block's gas usage.
///
/// For blocks with ≤20M gas used, a smaller chunk size (30) yields better throughput.
@@ -382,7 +391,7 @@ where
///
/// For blocks with fewer than [`Self::SMALL_BLOCK_TX_THRESHOLD`] transactions, uses
/// sequential iteration to avoid rayon overhead. For larger blocks, uses rayon parallel
/// iteration with [`ForEachOrdered`] to recover signatures in parallel while streaming
/// iteration with [`ForEachOrdered`] to convert transactions in parallel while streaming
/// results to execution in the original transaction order.
#[expect(clippy::type_complexity)]
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
@@ -401,7 +410,7 @@ where
// Empty block — nothing to do.
} else if transaction_count < Self::SMALL_BLOCK_TX_THRESHOLD {
// Sequential path for small blocks — avoids rayon work-stealing setup and
// channel-based reorder overhead when it costs more than the ECDSA recovery itself.
// channel-based reorder overhead when it costs more than sequential conversion.
debug!(
target: "engine::tree::payload_processor",
transaction_count,
@@ -411,34 +420,36 @@ where
let _enter =
debug_span!(target: "engine::tree::payload_processor", "tx_iterator").entered();
let (transactions, convert) = transactions.into_parts();
for (idx, tx) in transactions.into_iter().enumerate() {
let tx = convert.convert(tx);
let tx = tx.map(|tx| {
let (tx_env, tx) = tx.into_parts();
WithTxEnv { tx_env, tx: Arc::new(tx) }
});
if let Ok(tx) = &tx {
let _ = prewarm_tx.send((idx, tx.clone()));
}
let _ = execute_tx.send(tx);
}
convert_serial(transactions.into_iter(), &convert, &prewarm_tx, &execute_tx);
});
} else {
// Parallel path — recover signatures in parallel on rayon, stream results
// to execution in order via `for_each_ordered`.
//
// To avoid a ~1ms stall waiting for rayon to schedule index 0, the first
// few transactions are recovered sequentially and sent immediately before
// entering the parallel iterator for the remainder.
let prefetch = Self::PARALLEL_PREFETCH_COUNT.min(transaction_count);
self.executor.spawn_blocking(move || {
let _enter =
debug_span!(target: "engine::tree::payload_processor", "tx_iterator").entered();
let (transactions, convert) = transactions.into_parts();
transactions
.into_par_iter()
let mut all: Vec<_> = transactions.into_iter().collect();
let rest = all.split_off(prefetch.min(all.len()));
// Convert the first few transactions sequentially so execution can
// start immediately without waiting for rayon work-stealing.
convert_serial(all.into_iter(), &convert, &prewarm_tx, &execute_tx);
// Convert the remaining transactions in parallel.
rest.into_par_iter()
.enumerate()
.map(|(idx, tx)| {
.map(|(i, tx)| {
let idx = i + prefetch;
let tx = convert.convert(tx);
tx.map(|tx| {
let (tx_env, tx) = tx.into_parts();
let tx = WithTxEnv { tx_env, tx: Arc::new(tx) };
// Send to prewarming out of order with the original index.
let _ = prewarm_tx.send((idx, tx.clone()));
tx
})
@@ -708,6 +719,30 @@ where
}
}
/// Converts transactions sequentially and sends them to the prewarm and execute channels.
fn convert_serial<RawTx, Tx, TxEnv, InnerTx, Recovered, Err, C>(
iter: impl Iterator<Item = RawTx>,
convert: &C,
prewarm_tx: &mpsc::SyncSender<(usize, WithTxEnv<TxEnv, Recovered>)>,
execute_tx: &mpsc::SyncSender<Result<WithTxEnv<TxEnv, Recovered>, Err>>,
) where
Tx: ExecutableTxParts<TxEnv, InnerTx, Recovered = Recovered>,
TxEnv: Clone,
C: ConvertTx<RawTx, Tx = Tx, Error = Err>,
{
for (idx, raw_tx) in iter.enumerate() {
let tx = convert.convert(raw_tx);
let tx = tx.map(|tx| {
let (tx_env, tx) = tx.into_parts();
WithTxEnv { tx_env, tx: Arc::new(tx) }
});
if let Ok(tx) = &tx {
let _ = prewarm_tx.send((idx, tx.clone()));
}
let _ = execute_tx.send(tx);
}
}
/// Handle to all the spawned tasks.
///
/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the