From 4a62d38af2fecee884313451d5c1b72962a583bf Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Thu, 12 Feb 2026 10:06:21 -0500 Subject: [PATCH] perf(engine): use sequential sig recovery for blocks with small blocks (#22077) Co-authored-by: Amp Co-authored-by: Ubuntu Co-authored-by: YK --- .../tree/src/tree/payload_processor/mod.rs | 75 +++++++++++++---- crates/evm/evm/src/engine.rs | 81 +++++++++++++------ 2 files changed, 115 insertions(+), 41 deletions(-) diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index 5d25f67383..bb34a75dfa 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -235,7 +235,8 @@ where + 'static, { // start preparing transactions immediately - let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions); + let (prewarm_rx, execution_rx) = + self.spawn_tx_iterator(transactions, env.transaction_count); let span = Span::current(); let (to_sparse_trie, sparse_trie_rx) = channel(); @@ -342,7 +343,8 @@ where where P: BlockReader + StateProviderFactory + StateReader + Clone + 'static, { - let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions); + let (prewarm_rx, execution_rx) = + self.spawn_tx_iterator(transactions, env.transaction_count); // This path doesn't use multiproof, so V2 proofs flag doesn't matter let prewarm_handle = self.spawn_caching_with(env, prewarm_rx, provider_builder, None, bal, false); @@ -355,11 +357,23 @@ where } } + /// Transaction count threshold below which sequential signature recovery is used. + /// + /// For blocks with fewer than this many transactions, the rayon parallel iterator overhead + /// (work-stealing setup, channel-based reorder) exceeds the cost of sequential ECDSA + /// recovery. Inspired by Nethermind's `RecoverSignature` which uses sequential `foreach` + /// for small blocks. + const SMALL_BLOCK_TX_THRESHOLD: usize = 30; + /// Spawns a task advancing transaction env iterator and streaming updates through a channel. + /// + /// For blocks with fewer than [`Self::SMALL_BLOCK_TX_THRESHOLD`] transactions, uses + /// sequential iteration to avoid rayon overhead. #[expect(clippy::type_complexity)] fn spawn_tx_iterator>( &self, transactions: I, + transaction_count: usize, ) -> ( mpsc::Receiver, I::Recovered>>, mpsc::Receiver, I::Recovered>, I::Error>>, @@ -368,22 +382,51 @@ where let (prewarm_tx, prewarm_rx) = mpsc::channel(); let (execute_tx, execute_rx) = mpsc::channel(); - // Spawn a task that `convert`s all transactions in parallel and sends them out-of-order. - rayon::spawn(move || { - let (transactions, convert) = transactions.into_parts(); - transactions.into_par_iter().enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| { - let tx = convert.convert(tx); - let tx = tx.map(|tx| { - let (tx_env, tx) = tx.into_parts(); - WithTxEnv { tx_env, tx: Arc::new(tx) } - }); - // Only send Ok(_) variants to prewarming task. - if let Ok(tx) = &tx { - let _ = prewarm_tx.send(tx.clone()); + if transaction_count == 0 { + // Empty block — nothing to do. + } else if transaction_count < Self::SMALL_BLOCK_TX_THRESHOLD { + // Sequential path for small blocks — avoids rayon work-stealing setup and + // channel-based reorder overhead when it costs more than the ECDSA recovery itself. + debug!( + target: "engine::tree::payload_processor", + transaction_count, + "using sequential sig recovery for small block" + ); + self.executor.spawn_blocking(move || { + let (transactions, convert) = transactions.into_parts(); + for (idx, tx) in transactions.into_iter().enumerate() { + let tx = convert.convert(tx); + let tx = tx.map(|tx| { + let (tx_env, tx) = tx.into_parts(); + WithTxEnv { tx_env, tx: Arc::new(tx) } + }); + if let Ok(tx) = &tx { + let _ = prewarm_tx.send(tx.clone()); + } + let _ = ooo_tx.send((idx, tx)); } - let _ = ooo_tx.send((idx, tx)); }); - }); + } else { + // Parallel path — spawn on rayon for parallel signature recovery. + rayon::spawn(move || { + let (transactions, convert) = transactions.into_parts(); + transactions.into_par_iter().enumerate().for_each_with( + ooo_tx, + |ooo_tx, (idx, tx)| { + let tx = convert.convert(tx); + let tx = tx.map(|tx| { + let (tx_env, tx) = tx.into_parts(); + WithTxEnv { tx_env, tx: Arc::new(tx) } + }); + // Only send Ok(_) variants to prewarming task. + if let Ok(tx) = &tx { + let _ = prewarm_tx.send(tx.clone()); + } + let _ = ooo_tx.send((idx, tx)); + }, + ); + }); + } // Spawn a task that processes out-of-order transactions from the task above and sends them // to the execution task in order. diff --git a/crates/evm/evm/src/engine.rs b/crates/evm/evm/src/engine.rs index 761a2eea07..045e68db04 100644 --- a/crates/evm/evm/src/engine.rs +++ b/crates/evm/evm/src/engine.rs @@ -83,6 +83,7 @@ pub trait ExecutableTxTuple: Send + 'static { /// Iterator over [`ExecutableTxTuple::Tx`]. type IntoIter: IntoParallelIterator + + IntoIterator + Send + 'static; /// Converter that can be used to convert a [`ExecutableTxTuple::RawTx`] to a @@ -99,7 +100,10 @@ where RawTx: Send + Sync + 'static, Tx: Clone + Send + Sync + 'static, Err: core::error::Error + Send + Sync + 'static, - I: IntoParallelIterator + Send + 'static, + I: IntoParallelIterator + + IntoIterator + + Send + + 'static, F: Fn(RawTx) -> Result + Send + Sync + 'static, { type RawTx = RawTx; @@ -130,43 +134,70 @@ where type Recovered = , TxTy>>::Recovered; } +/// Wraps `Either` to implement both [`IntoParallelIterator`] and [`IntoIterator`], +/// mapping items through [`Either::Left`] / [`Either::Right`] on demand without collecting. +#[derive(Debug)] +pub struct EitherIter(Either); + +impl IntoParallelIterator for EitherIter +where + L: IntoParallelIterator, + R: IntoParallelIterator, + L::Iter: IndexedParallelIterator, + R::Iter: IndexedParallelIterator, +{ + type Item = Either; + type Iter = Either< + rayon::iter::Map Either>, + rayon::iter::Map Either>, + >; + + fn into_par_iter(self) -> Self::Iter { + match self.0 { + Either::Left(l) => Either::Left(l.into_par_iter().map(Either::Left)), + Either::Right(r) => Either::Right(r.into_par_iter().map(Either::Right)), + } + } +} + +impl IntoIterator for EitherIter +where + L: IntoIterator, + R: IntoIterator, +{ + type Item = Either; + type IntoIter = Either< + core::iter::Map Either>, + core::iter::Map Either>, + >; + + fn into_iter(self) -> Self::IntoIter { + match self.0 { + Either::Left(l) => Either::Left(l.into_iter().map(Either::Left)), + Either::Right(r) => Either::Right(r.into_iter().map(Either::Right)), + } + } +} + +// SAFETY: `EitherIter` is just a newtype over `Either`. +unsafe impl Send for EitherIter {} + impl ExecutableTxTuple for Either { type RawTx = Either; type Tx = Either; type Error = Either; - type IntoIter = Either< - rayon::iter::Map< - ::Iter, - fn(A::RawTx) -> Either, - >, - rayon::iter::Map< - ::Iter, - fn(B::RawTx) -> Either, - >, - >; + type IntoIter = EitherIter; type Convert = Either; fn into_parts(self) -> (Self::IntoIter, Self::Convert) { match self { Self::Left(a) => { let (iter, convert) = a.into_parts(); - ( - Either::Left( - iter.into_par_iter() - .map(Either::Left as fn(A::RawTx) -> Either), - ), - Either::Left(convert), - ) + (EitherIter(Either::Left(iter)), Either::Left(convert)) } Self::Right(b) => { let (iter, convert) = b.into_parts(); - ( - Either::Right( - iter.into_par_iter() - .map(Either::Right as fn(B::RawTx) -> Either), - ), - Either::Right(convert), - ) + (EitherIter(Either::Right(iter)), Either::Right(convert)) } } }