perf: spawn prewarming transactions in chunks (#15155)

Co-authored-by: Federico Gimenez <federico.gimenez@gmail.com>
This commit is contained in:
Dan Cline
2025-04-07 10:00:34 -04:00
committed by GitHub
parent abd8981310
commit 4dc1b5c907
3 changed files with 44 additions and 79 deletions

View File

@@ -60,6 +60,7 @@ rayon.workspace = true
tracing.workspace = true
derive_more.workspace = true
parking_lot.workspace = true
itertools.workspace = true
# optional deps for test-utils
reth-prune-types = { workspace = true, optional = true }

View File

@@ -10,6 +10,7 @@ use crate::tree::{
use alloy_consensus::transaction::Recovered;
use alloy_evm::Database;
use alloy_primitives::{keccak256, map::B256Set, B256};
use itertools::Itertools;
use metrics::{Gauge, Histogram};
use reth_evm::{ConfigureEvm, Evm, EvmFor};
use reth_metrics::Metrics;
@@ -37,8 +38,6 @@ pub(super) struct PrewarmCacheTask<N: NodePrimitives, P, Evm> {
pending: VecDeque<Recovered<N::SignedTx>>,
/// Context provided to execution tasks
ctx: PrewarmContext<N, P, Evm>,
/// How many txs are currently in progress
in_progress: usize,
/// How many transactions should be executed in parallel
max_concurrency: usize,
/// Sender to emit evm state outcome messages, if any.
@@ -69,7 +68,6 @@ where
execution_cache,
pending,
ctx,
in_progress: 0,
max_concurrency: 64,
to_multi_proof,
actions_rx,
@@ -82,47 +80,21 @@ where
self.actions_tx.clone()
}
/// Spawns the next transactions
fn spawn_next(&mut self) {
while self.in_progress < self.max_concurrency {
if let Some(tx) = self.pending.pop_front() {
// increment the in progress counter
self.in_progress += 1;
/// Spawns all pending transactions as blocking tasks by first chunking them.
fn spawn_all(&mut self) {
let chunk_size = (self.pending.len() / self.max_concurrency).max(1);
self.spawn_transaction(tx);
} else {
break
}
for chunk in &self.pending.drain(..).chunks(chunk_size) {
let sender = self.actions_tx.clone();
let ctx = self.ctx.clone();
let pending_chunk = chunk.collect::<Vec<_>>();
self.executor.spawn_blocking(move || {
ctx.transact_batch(&pending_chunk, sender);
});
}
}
/// Spawns the given transaction as a blocking task.
fn spawn_transaction(&self, tx: Recovered<N::SignedTx>) {
let ctx = self.ctx.clone();
let metrics = self.ctx.metrics.clone();
let actions_tx = self.actions_tx.clone();
let prepare_proof_targets = self.should_prepare_multi_proof_targets();
self.executor.spawn_blocking(move || {
let start = Instant::now();
// depending on whether this task needs he proof targets we either just transact or
// transact and prepare the targets
let proof_targets = if prepare_proof_targets {
ctx.prepare_multiproof_targets(tx)
} else {
ctx.transact(tx);
None
};
let _ = actions_tx.send(PrewarmTaskEvent::Outcome { proof_targets });
metrics.total_runtime.record(start.elapsed());
});
}
/// Returns true if the tx prewarming tasks should prepare multiproof targets.
fn should_prepare_multi_proof_targets(&self) -> bool {
self.to_multi_proof.is_some()
}
/// If configured and the tx returned proof targets, emit the targets the transaction produced
fn send_multi_proof_targets(&self, targets: Option<MultiProofTargets>) {
if let Some((proof_targets, to_multi_proof)) = targets.zip(self.to_multi_proof.as_ref()) {
@@ -160,7 +132,7 @@ where
self.ctx.metrics.transactions_histogram.record(self.pending.len() as f64);
// spawn execution tasks.
self.spawn_next();
self.spawn_all();
while let Ok(event) = self.actions_rx.recv() {
match event {
@@ -170,7 +142,6 @@ where
}
PrewarmTaskEvent::Outcome { proof_targets } => {
// completed a transaction, frees up one slot
self.in_progress -= 1;
self.send_multi_proof_targets(proof_targets);
}
PrewarmTaskEvent::Terminate { block_output } => {
@@ -182,9 +153,6 @@ where
break
}
}
// schedule followup transactions
self.spawn_next();
}
}
}
@@ -207,17 +175,6 @@ where
P: BlockReader + StateProviderFactory + StateReader + StateCommitmentProvider + Clone + 'static,
Evm: ConfigureEvm<Primitives = N> + 'static,
{
/// Transacts the transactions and transform the state into [`MultiProofTargets`].
fn prepare_multiproof_targets(self, tx: Recovered<N::SignedTx>) -> Option<MultiProofTargets> {
let metrics = self.metrics.clone();
let state = self.transact(tx)?;
let (targets, storage_targets) = multiproof_targets_from_state(state);
metrics.prefetch_storage_targets.record(storage_targets as f64);
Some(targets)
}
/// Splits this context into an evm, an evm config, and metrics.
fn evm_for_ctx(self) -> Option<(EvmFor<Evm, impl Database>, Evm, PrewarmMetrics)> {
let Self { header, evm_config, cache: caches, cache_metrics, provider, metrics } = self;
@@ -252,35 +209,41 @@ where
Some((evm, evm_config, metrics))
}
/// Transacts the transaction and returns the state outcome.
/// Transacts the vec of transactions and returns the state outcome.
///
/// Returns `None` if executing the transaction failed to a non Revert error.
/// Returns `None` if executing the transactions failed to a non Revert error.
/// Returns the touched+modified state of the transaction.
///
/// Note: Since here are no ordering guarantees this won't the state the tx produces when
/// Note: Since here are no ordering guarantees this won't the state the txs produce when
/// executed sequentially.
fn transact(self, tx: Recovered<N::SignedTx>) -> Option<EvmState> {
let (mut evm, evm_config, metrics) = self.evm_for_ctx()?;
fn transact_batch(self, txs: &[Recovered<N::SignedTx>], sender: Sender<PrewarmTaskEvent>) {
let Some((mut evm, evm_config, metrics)) = self.evm_for_ctx() else { return };
// create the tx env and reset nonce
let tx_env = evm_config.tx_env(&tx);
let start = Instant::now();
let res = match evm.transact(tx_env) {
Ok(res) => res,
Err(err) => {
trace!(
target: "engine::tree",
%err,
tx_hash=%tx.tx_hash(),
sender=%tx.signer(),
"Error when executing prewarm transaction",
);
return None
}
};
metrics.execution_duration.record(start.elapsed());
for tx in txs {
// create the tx env
let tx_env = evm_config.tx_env(tx);
let start = Instant::now();
let res = match evm.transact(tx_env) {
Ok(res) => res,
Err(err) => {
trace!(
target: "engine::tree",
%err,
tx_hash=%tx.tx_hash(),
sender=%tx.signer(),
"Error when executing prewarm transaction",
);
return
}
};
metrics.execution_duration.record(start.elapsed());
Some(res.state)
let (targets, storage_targets) = multiproof_targets_from_state(res.state);
metrics.prefetch_storage_targets.record(storage_targets as f64);
metrics.total_runtime.record(start.elapsed());
let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: Some(targets) });
}
}
}