perf(engine): use sequential sig recovery for blocks with small blocks (#22077)

Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Ubuntu <ubuntu@dev-yk.tail388b2e.ts.net>
Co-authored-by: YK <chiayongkang@hotmail.com>
This commit is contained in:
Georgios Konstantopoulos
2026-02-12 10:06:21 -05:00
committed by GitHub
parent dc4f249f09
commit 4a62d38af2
2 changed files with 115 additions and 41 deletions

View File

@@ -235,7 +235,8 @@ where
+ 'static,
{
// start preparing transactions immediately
let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions);
let (prewarm_rx, execution_rx) =
self.spawn_tx_iterator(transactions, env.transaction_count);
let span = Span::current();
let (to_sparse_trie, sparse_trie_rx) = channel();
@@ -342,7 +343,8 @@ where
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
{
let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions);
let (prewarm_rx, execution_rx) =
self.spawn_tx_iterator(transactions, env.transaction_count);
// This path doesn't use multiproof, so V2 proofs flag doesn't matter
let prewarm_handle =
self.spawn_caching_with(env, prewarm_rx, provider_builder, None, bal, false);
@@ -355,11 +357,23 @@ where
}
}
/// Transaction count threshold below which sequential signature recovery is used.
///
/// For blocks with fewer than this many transactions, the rayon parallel iterator overhead
/// (work-stealing setup, channel-based reorder) exceeds the cost of sequential ECDSA
/// recovery. Inspired by Nethermind's `RecoverSignature` which uses sequential `foreach`
/// for small blocks.
const SMALL_BLOCK_TX_THRESHOLD: usize = 30;
/// Spawns a task advancing transaction env iterator and streaming updates through a channel.
///
/// For blocks with fewer than [`Self::SMALL_BLOCK_TX_THRESHOLD`] transactions, uses
/// sequential iteration to avoid rayon overhead.
#[expect(clippy::type_complexity)]
fn spawn_tx_iterator<I: ExecutableTxIterator<Evm>>(
&self,
transactions: I,
transaction_count: usize,
) -> (
mpsc::Receiver<WithTxEnv<TxEnvFor<Evm>, I::Recovered>>,
mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Recovered>, I::Error>>,
@@ -368,22 +382,51 @@ where
let (prewarm_tx, prewarm_rx) = mpsc::channel();
let (execute_tx, execute_rx) = mpsc::channel();
// Spawn a task that `convert`s all transactions in parallel and sends them out-of-order.
rayon::spawn(move || {
let (transactions, convert) = transactions.into_parts();
transactions.into_par_iter().enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| {
let tx = convert.convert(tx);
let tx = tx.map(|tx| {
let (tx_env, tx) = tx.into_parts();
WithTxEnv { tx_env, tx: Arc::new(tx) }
});
// Only send Ok(_) variants to prewarming task.
if let Ok(tx) = &tx {
let _ = prewarm_tx.send(tx.clone());
if transaction_count == 0 {
// Empty block — nothing to do.
} else if transaction_count < Self::SMALL_BLOCK_TX_THRESHOLD {
// Sequential path for small blocks — avoids rayon work-stealing setup and
// channel-based reorder overhead when it costs more than the ECDSA recovery itself.
debug!(
target: "engine::tree::payload_processor",
transaction_count,
"using sequential sig recovery for small block"
);
self.executor.spawn_blocking(move || {
let (transactions, convert) = transactions.into_parts();
for (idx, tx) in transactions.into_iter().enumerate() {
let tx = convert.convert(tx);
let tx = tx.map(|tx| {
let (tx_env, tx) = tx.into_parts();
WithTxEnv { tx_env, tx: Arc::new(tx) }
});
if let Ok(tx) = &tx {
let _ = prewarm_tx.send(tx.clone());
}
let _ = ooo_tx.send((idx, tx));
}
let _ = ooo_tx.send((idx, tx));
});
});
} else {
// Parallel path — spawn on rayon for parallel signature recovery.
rayon::spawn(move || {
let (transactions, convert) = transactions.into_parts();
transactions.into_par_iter().enumerate().for_each_with(
ooo_tx,
|ooo_tx, (idx, tx)| {
let tx = convert.convert(tx);
let tx = tx.map(|tx| {
let (tx_env, tx) = tx.into_parts();
WithTxEnv { tx_env, tx: Arc::new(tx) }
});
// Only send Ok(_) variants to prewarming task.
if let Ok(tx) = &tx {
let _ = prewarm_tx.send(tx.clone());
}
let _ = ooo_tx.send((idx, tx));
},
);
});
}
// Spawn a task that processes out-of-order transactions from the task above and sends them
// to the execution task in order.

View File

@@ -83,6 +83,7 @@ pub trait ExecutableTxTuple: Send + 'static {
/// Iterator over [`ExecutableTxTuple::Tx`].
type IntoIter: IntoParallelIterator<Item = Self::RawTx, Iter: IndexedParallelIterator>
+ IntoIterator<Item = Self::RawTx>
+ Send
+ 'static;
/// Converter that can be used to convert a [`ExecutableTxTuple::RawTx`] to a
@@ -99,7 +100,10 @@ where
RawTx: Send + Sync + 'static,
Tx: Clone + Send + Sync + 'static,
Err: core::error::Error + Send + Sync + 'static,
I: IntoParallelIterator<Item = RawTx, Iter: IndexedParallelIterator> + Send + 'static,
I: IntoParallelIterator<Item = RawTx, Iter: IndexedParallelIterator>
+ IntoIterator<Item = RawTx>
+ Send
+ 'static,
F: Fn(RawTx) -> Result<Tx, Err> + Send + Sync + 'static,
{
type RawTx = RawTx;
@@ -130,43 +134,70 @@ where
type Recovered = <T::Tx as ExecutableTxParts<TxEnvFor<Evm>, TxTy<Evm::Primitives>>>::Recovered;
}
/// Wraps `Either<L, R>` to implement both [`IntoParallelIterator`] and [`IntoIterator`],
/// mapping items through [`Either::Left`] / [`Either::Right`] on demand without collecting.
#[derive(Debug)]
pub struct EitherIter<L, R>(Either<L, R>);
impl<L, R> IntoParallelIterator for EitherIter<L, R>
where
L: IntoParallelIterator,
R: IntoParallelIterator,
L::Iter: IndexedParallelIterator,
R::Iter: IndexedParallelIterator,
{
type Item = Either<L::Item, R::Item>;
type Iter = Either<
rayon::iter::Map<L::Iter, fn(L::Item) -> Either<L::Item, R::Item>>,
rayon::iter::Map<R::Iter, fn(R::Item) -> Either<L::Item, R::Item>>,
>;
fn into_par_iter(self) -> Self::Iter {
match self.0 {
Either::Left(l) => Either::Left(l.into_par_iter().map(Either::Left)),
Either::Right(r) => Either::Right(r.into_par_iter().map(Either::Right)),
}
}
}
impl<L, R> IntoIterator for EitherIter<L, R>
where
L: IntoIterator,
R: IntoIterator,
{
type Item = Either<L::Item, R::Item>;
type IntoIter = Either<
core::iter::Map<L::IntoIter, fn(L::Item) -> Either<L::Item, R::Item>>,
core::iter::Map<R::IntoIter, fn(R::Item) -> Either<L::Item, R::Item>>,
>;
fn into_iter(self) -> Self::IntoIter {
match self.0 {
Either::Left(l) => Either::Left(l.into_iter().map(Either::Left)),
Either::Right(r) => Either::Right(r.into_iter().map(Either::Right)),
}
}
}
// SAFETY: `EitherIter` is just a newtype over `Either<L, R>`.
unsafe impl<L: Send, R: Send> Send for EitherIter<L, R> {}
impl<A: ExecutableTxTuple, B: ExecutableTxTuple> ExecutableTxTuple for Either<A, B> {
type RawTx = Either<A::RawTx, B::RawTx>;
type Tx = Either<A::Tx, B::Tx>;
type Error = Either<A::Error, B::Error>;
type IntoIter = Either<
rayon::iter::Map<
<A::IntoIter as IntoParallelIterator>::Iter,
fn(A::RawTx) -> Either<A::RawTx, B::RawTx>,
>,
rayon::iter::Map<
<B::IntoIter as IntoParallelIterator>::Iter,
fn(B::RawTx) -> Either<A::RawTx, B::RawTx>,
>,
>;
type IntoIter = EitherIter<A::IntoIter, B::IntoIter>;
type Convert = Either<A::Convert, B::Convert>;
fn into_parts(self) -> (Self::IntoIter, Self::Convert) {
match self {
Self::Left(a) => {
let (iter, convert) = a.into_parts();
(
Either::Left(
iter.into_par_iter()
.map(Either::Left as fn(A::RawTx) -> Either<A::RawTx, B::RawTx>),
),
Either::Left(convert),
)
(EitherIter(Either::Left(iter)), Either::Left(convert))
}
Self::Right(b) => {
let (iter, convert) = b.into_parts();
(
Either::Right(
iter.into_par_iter()
.map(Either::Right as fn(B::RawTx) -> Either<A::RawTx, B::RawTx>),
),
Either::Right(convert),
)
(EitherIter(Either::Right(iter)), Either::Right(convert))
}
}
}