refactor(engine): improve payload processor tx iterator (#21658)

Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
Matthias Seitz
2026-02-01 13:44:10 +01:00
committed by GitHub
parent a9b2c1d454
commit 7d10e791b2
3 changed files with 18 additions and 26 deletions

View File

@@ -235,8 +235,7 @@ where
+ 'static,
{
// start preparing transactions immediately
let (prewarm_rx, execution_rx, transaction_count_hint) =
self.spawn_tx_iterator(transactions);
let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions);
let span = Span::current();
let (to_sparse_trie, sparse_trie_rx) = channel();
@@ -260,7 +259,6 @@ where
self.spawn_caching_with(
env,
prewarm_rx,
transaction_count_hint,
provider_builder.clone(),
None, // Don't send proof targets when BAL is present
Some(bal),
@@ -271,7 +269,6 @@ where
self.spawn_caching_with(
env,
prewarm_rx,
transaction_count_hint,
provider_builder.clone(),
Some(to_multi_proof.clone()),
None,
@@ -355,10 +352,10 @@ where
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
{
let (prewarm_rx, execution_rx, size_hint) = self.spawn_tx_iterator(transactions);
let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions);
// This path doesn't use multiproof, so V2 proofs flag doesn't matter
let prewarm_handle =
self.spawn_caching_with(env, prewarm_rx, size_hint, provider_builder, None, bal, false);
self.spawn_caching_with(env, prewarm_rx, provider_builder, None, bal, false);
PayloadHandle {
to_multi_proof: None,
prewarm_handle,
@@ -376,19 +373,15 @@ where
) -> (
mpsc::Receiver<WithTxEnv<TxEnvFor<Evm>, I::Recovered>>,
mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Recovered>, I::Error>>,
usize,
) {
let (transactions, convert) = transactions.into();
let transactions = transactions.into_par_iter();
let transaction_count_hint = transactions.len();
let (ooo_tx, ooo_rx) = mpsc::channel();
let (prewarm_tx, prewarm_rx) = mpsc::channel();
let (execute_tx, execute_rx) = mpsc::channel();
// Spawn a task that `convert`s all transactions in parallel and sends them out-of-order.
self.executor.spawn_blocking(move || {
transactions.enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| {
rayon::spawn(move || {
let (transactions, convert) = transactions.into();
transactions.into_par_iter().enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| {
let tx = convert(tx);
let tx = tx.map(|tx| {
let (tx_env, tx) = tx.into_parts();
@@ -424,16 +417,14 @@ where
}
});
(prewarm_rx, execute_rx, transaction_count_hint)
(prewarm_rx, execute_rx)
}
/// Spawn prewarming optionally wired to the multiproof task for target updates.
#[expect(clippy::too_many_arguments)]
fn spawn_caching_with<P>(
&self,
env: ExecutionEnv<Evm>,
mut transactions: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
transaction_count_hint: usize,
provider_builder: StateProviderBuilder<N, P>,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
bal: Option<Arc<BlockAccessList>>,
@@ -468,7 +459,6 @@ where
self.execution_cache.clone(),
prewarm_ctx,
to_multi_proof,
transaction_count_hint,
self.prewarm_max_concurrency,
);
@@ -961,6 +951,10 @@ pub struct ExecutionEnv<Evm: ConfigureEvm> {
/// Used for sparse trie continuation: if the preserved trie's anchor matches this,
/// the trie can be reused directly.
pub parent_state_root: B256,
/// Number of transactions in the block.
/// Used to determine parallel worker count for prewarming.
/// A value of 0 indicates the count is unknown.
pub transaction_count: usize,
}
impl<Evm: ConfigureEvm> Default for ExecutionEnv<Evm>
@@ -973,6 +967,7 @@ where
hash: Default::default(),
parent_hash: Default::default(),
parent_state_root: Default::default(),
transaction_count: 0,
}
}
}

View File

@@ -84,8 +84,6 @@ where
ctx: PrewarmContext<N, P, Evm>,
/// How many transactions should be executed in parallel
max_concurrency: usize,
/// The number of transactions to be processed
transaction_count_hint: usize,
/// Sender to emit evm state outcome messages, if any.
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
/// Receiver for events produced by tx execution
@@ -106,7 +104,6 @@ where
execution_cache: PayloadExecutionCache,
ctx: PrewarmContext<N, P, Evm>,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
transaction_count_hint: usize,
max_concurrency: usize,
) -> (Self, Sender<PrewarmTaskEvent<N::Receipt>>) {
let (actions_tx, actions_rx) = channel();
@@ -114,7 +111,7 @@ where
trace!(
target: "engine::tree::payload_processor::prewarm",
max_concurrency,
transaction_count_hint,
transaction_count = ctx.env.transaction_count,
"Initialized prewarm task"
);
@@ -124,7 +121,6 @@ where
execution_cache,
ctx,
max_concurrency,
transaction_count_hint,
to_multi_proof,
actions_rx,
parent_span: Span::current(),
@@ -148,7 +144,6 @@ where
let executor = self.executor.clone();
let ctx = self.ctx.clone();
let max_concurrency = self.max_concurrency;
let transaction_count_hint = self.transaction_count_hint;
let span = Span::current();
self.executor.spawn_blocking(move || {
@@ -156,13 +151,14 @@ where
let (done_tx, done_rx) = mpsc::channel();
// When transaction_count_hint is 0, it means the count is unknown. In this case, spawn
// When transaction_count is 0, it means the count is unknown. In this case, spawn
// max workers to handle potentially many transactions in parallel rather
// than bottlenecking on a single worker.
let workers_needed = if transaction_count_hint == 0 {
let transaction_count = ctx.env.transaction_count;
let workers_needed = if transaction_count == 0 {
max_concurrency
} else {
transaction_count_hint.min(max_concurrency)
transaction_count.min(max_concurrency)
};
// Spawn workers

View File

@@ -407,6 +407,7 @@ where
hash: input.hash(),
parent_hash: input.parent_hash(),
parent_state_root: parent_block.state_root(),
transaction_count: input.transaction_count(),
};
// Plan the strategy used for state root computation.