perf: optimize Optimism deposit transaction prewarming (#18327)

This commit is contained in:
YK
2025-09-29 17:06:17 +08:00
committed by GitHub
parent 48b725aec2
commit ec4e6aafde
3 changed files with 198 additions and 39 deletions

View File

@@ -17,6 +17,9 @@ pub const DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE: usize = 10;
/// This will be deducted from the thread count of main reth global threadpool.
pub const DEFAULT_RESERVED_CPU_CORES: usize = 1;
/// Default maximum concurrency for prewarm task.
pub const DEFAULT_PREWARM_MAX_CONCURRENCY: usize = 16;
const DEFAULT_BLOCK_BUFFER_LIMIT: u32 = 256;
const DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH: u32 = 256;
const DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE: usize = 4;
@@ -102,6 +105,8 @@ pub struct TreeConfig {
/// where immediate payload regeneration is desired despite the head not changing or moving to
/// an ancestor.
always_process_payload_attributes_on_canonical_head: bool,
/// Maximum concurrency for the prewarm task.
prewarm_max_concurrency: usize,
/// Whether to unwind canonical header to ancestor during forkchoice updates.
allow_unwind_canonical_header: bool,
}
@@ -128,6 +133,7 @@ impl Default for TreeConfig {
precompile_cache_disabled: false,
state_root_fallback: false,
always_process_payload_attributes_on_canonical_head: false,
prewarm_max_concurrency: DEFAULT_PREWARM_MAX_CONCURRENCY,
allow_unwind_canonical_header: false,
}
}
@@ -156,6 +162,7 @@ impl TreeConfig {
precompile_cache_disabled: bool,
state_root_fallback: bool,
always_process_payload_attributes_on_canonical_head: bool,
prewarm_max_concurrency: usize,
allow_unwind_canonical_header: bool,
) -> Self {
Self {
@@ -178,6 +185,7 @@ impl TreeConfig {
precompile_cache_disabled,
state_root_fallback,
always_process_payload_attributes_on_canonical_head,
prewarm_max_concurrency,
allow_unwind_canonical_header,
}
}
@@ -433,4 +441,15 @@ impl TreeConfig {
pub const fn use_state_root_task(&self) -> bool {
self.has_enough_parallelism && !self.legacy_state_root
}
/// Setter for prewarm max concurrency.
pub const fn with_prewarm_max_concurrency(mut self, prewarm_max_concurrency: usize) -> Self {
self.prewarm_max_concurrency = prewarm_max_concurrency;
self
}
/// Return the prewarm max concurrency.
pub const fn prewarm_max_concurrency(&self) -> usize {
self.prewarm_max_concurrency
}
}

View File

@@ -96,6 +96,8 @@ where
disable_parallel_sparse_trie: bool,
/// A cleared trie input, kept around to be reused so allocations can be minimized.
trie_input: Option<TrieInput>,
/// Maximum concurrency for prewarm task.
prewarm_max_concurrency: usize,
}
impl<N, Evm> PayloadProcessor<Evm>
@@ -122,6 +124,7 @@ where
sparse_state_trie: Arc::default(),
trie_input: None,
disable_parallel_sparse_trie: config.disable_parallel_sparse_trie(),
prewarm_max_concurrency: config.prewarm_max_concurrency(),
}
}
}
@@ -215,10 +218,16 @@ where
// wire the multiproof task to the prewarm task
let to_multi_proof = Some(multi_proof_task.state_root_message_sender());
let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions);
let (prewarm_rx, execution_rx, transaction_count_hint) =
self.spawn_tx_iterator(transactions);
let prewarm_handle =
self.spawn_caching_with(env, prewarm_rx, provider_builder, to_multi_proof.clone());
let prewarm_handle = self.spawn_caching_with(
env,
prewarm_rx,
transaction_count_hint,
provider_builder,
to_multi_proof.clone(),
);
// spawn multi-proof task
self.executor.spawn_blocking(move || {
@@ -263,8 +272,9 @@ where
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
{
let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions);
let prewarm_handle = self.spawn_caching_with(env, prewarm_rx, provider_builder, None);
let (prewarm_rx, execution_rx, size_hint) = self.spawn_tx_iterator(transactions);
let prewarm_handle =
self.spawn_caching_with(env, prewarm_rx, size_hint, provider_builder, None);
PayloadHandle {
to_multi_proof: None,
prewarm_handle,
@@ -281,7 +291,13 @@ where
) -> (
mpsc::Receiver<WithTxEnv<TxEnvFor<Evm>, I::Tx>>,
mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>>,
usize,
) {
// Get the transaction count for prewarming task
// Use upper bound if available (more accurate), otherwise use lower bound
let (lower, upper) = transactions.size_hint();
let transaction_count_hint = upper.unwrap_or(lower);
let (prewarm_tx, prewarm_rx) = mpsc::channel();
let (execute_tx, execute_rx) = mpsc::channel();
self.executor.spawn_blocking(move || {
@@ -295,14 +311,15 @@ where
}
});
(prewarm_rx, execute_rx)
(prewarm_rx, execute_rx, transaction_count_hint)
}
/// Spawn prewarming optionally wired to the multiproof task for target updates.
fn spawn_caching_with<P>(
&self,
env: ExecutionEnv<Evm>,
mut transactions: mpsc::Receiver<impl ExecutableTxFor<Evm> + Send + 'static>,
mut transactions: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
transaction_count_hint: usize,
provider_builder: StateProviderBuilder<N, P>,
to_multi_proof: Option<Sender<MultiProofMessage>>,
) -> CacheTaskHandle
@@ -335,6 +352,8 @@ where
self.execution_cache.clone(),
prewarm_ctx,
to_multi_proof,
transaction_count_hint,
self.prewarm_max_concurrency,
);
// spawn pre-warm task

View File

@@ -21,6 +21,7 @@ use crate::tree::{
ExecutionEnv, StateProviderBuilder,
};
use alloy_consensus::transaction::TxHashRef;
use alloy_eips::Typed2718;
use alloy_evm::Database;
use alloy_primitives::{keccak256, map::B256Set, B256};
use metrics::{Counter, Gauge, Histogram};
@@ -38,7 +39,29 @@ use std::{
},
time::Instant,
};
use tracing::{debug, trace};
use tracing::{debug, trace, warn};
/// A wrapper for transactions that includes their index in the block.
#[derive(Clone)]
struct IndexedTransaction<Tx> {
/// The transaction index in the block.
index: usize,
/// The wrapped transaction.
tx: Tx,
}
/// Maximum standard Ethereum transaction type value.
///
/// Standard transaction types are:
/// - Type 0: Legacy transactions (original Ethereum)
/// - Type 1: EIP-2930 (access list transactions)
/// - Type 2: EIP-1559 (dynamic fee transactions)
/// - Type 3: EIP-4844 (blob transactions)
/// - Type 4: EIP-7702 (set code authorization transactions)
///
/// Any transaction with a type > 4 is considered a non-standard/system transaction,
/// typically used by L2s for special purposes (e.g., Optimism deposit transactions use type 126).
const MAX_STANDARD_TX_TYPE: u8 = 4;
/// A task that is responsible for caching and prewarming the cache by executing transactions
/// individually in parallel.
@@ -57,6 +80,8 @@ where
ctx: PrewarmContext<N, P, Evm>,
/// How many transactions should be executed in parallel
max_concurrency: usize,
/// The number of transactions to be processed
transaction_count_hint: usize,
/// Sender to emit evm state outcome messages, if any.
to_multi_proof: Option<Sender<MultiProofMessage>>,
/// Receiver for events produced by tx execution
@@ -75,14 +100,25 @@ where
execution_cache: PayloadExecutionCache,
ctx: PrewarmContext<N, P, Evm>,
to_multi_proof: Option<Sender<MultiProofMessage>>,
transaction_count_hint: usize,
max_concurrency: usize,
) -> (Self, Sender<PrewarmTaskEvent>) {
let (actions_tx, actions_rx) = channel();
trace!(
target: "engine::tree::prewarm",
max_concurrency,
transaction_count_hint,
"Initialized prewarm task"
);
(
Self {
executor,
execution_cache,
ctx,
max_concurrency: 64,
max_concurrency,
transaction_count_hint,
to_multi_proof,
actions_rx,
},
@@ -91,38 +127,97 @@ where
}
/// Spawns all pending transactions as blocking tasks by first chunking them.
fn spawn_all(
&self,
pending: mpsc::Receiver<impl ExecutableTxFor<Evm> + Send + 'static>,
actions_tx: Sender<PrewarmTaskEvent>,
) {
///
/// For Optimism chains, special handling is applied to the first transaction if it's a
/// deposit transaction (type 0x7E/126) which sets critical metadata that affects all
/// subsequent transactions in the block.
fn spawn_all<Tx>(&self, pending: mpsc::Receiver<Tx>, actions_tx: Sender<PrewarmTaskEvent>)
where
Tx: ExecutableTxFor<Evm> + Clone + Send + 'static,
{
let executor = self.executor.clone();
let ctx = self.ctx.clone();
let max_concurrency = self.max_concurrency;
let transaction_count_hint = self.transaction_count_hint;
self.executor.spawn_blocking(move || {
let mut handles = Vec::with_capacity(max_concurrency);
let (done_tx, done_rx) = mpsc::channel();
let mut executing = 0;
while let Ok(executable) = pending.recv() {
let task_idx = executing % max_concurrency;
let mut executing = 0usize;
if handles.len() <= task_idx {
let (tx, rx) = mpsc::channel();
let sender = actions_tx.clone();
let ctx = ctx.clone();
let done_tx = done_tx.clone();
// Initialize worker handles container
let mut handles = Vec::with_capacity(max_concurrency);
executor.spawn_blocking(move || {
ctx.transact_batch(rx, sender, done_tx);
});
// When transaction_count_hint is 0, it means the count is unknown. In this case, spawn
// max workers to handle potentially many transactions in parallel rather
// than bottlenecking on a single worker.
let workers_needed = if transaction_count_hint == 0 {
max_concurrency
} else {
transaction_count_hint.min(max_concurrency)
};
handles.push(tx);
// Only spawn initial workers as needed
for _ in 0..workers_needed {
handles.push(ctx.spawn_worker(&executor, actions_tx.clone(), done_tx.clone()));
}
let mut tx_index = 0usize;
// Handle first transaction - special case for system transactions
if let Ok(first_tx) = pending.recv() {
// Move the transaction into the indexed wrapper to avoid an extra clone
let indexed_tx = IndexedTransaction { index: tx_index, tx: first_tx };
// Compute metadata from the moved value
let tx_ref = indexed_tx.tx.tx();
let is_system_tx = tx_ref.ty() > MAX_STANDARD_TX_TYPE;
let first_tx_hash = tx_ref.tx_hash();
// Check if this is a system transaction (type > 4)
// System transactions in the first position typically set critical metadata
// that affects all subsequent transactions (e.g., L1 block info, fees on L2s).
if is_system_tx {
// Broadcast system transaction to all workers to ensure they have the
// critical state. This is particularly important for L2s like Optimism
// where the first deposit transaction contains essential block metadata.
for handle in &handles {
if let Err(err) = handle.send(indexed_tx.clone()) {
warn!(
target: "engine::tree::prewarm",
tx_hash = %first_tx_hash,
error = %err,
"Failed to send deposit transaction to worker"
);
}
}
} else {
// Not a deposit, send to first worker via round-robin
if let Err(err) = handles[0].send(indexed_tx) {
warn!(
target: "engine::tree::prewarm",
task_idx = 0,
error = %err,
"Failed to send transaction to worker"
);
}
}
let _ = handles[task_idx].send(executable);
executing += 1;
tx_index += 1;
}
// Process remaining transactions with round-robin distribution
while let Ok(executable) = pending.recv() {
let indexed_tx = IndexedTransaction { index: tx_index, tx: executable };
let task_idx = executing % workers_needed;
if let Err(err) = handles[task_idx].send(indexed_tx) {
warn!(
target: "engine::tree::prewarm",
task_idx,
error = %err,
"Failed to send transaction to worker"
);
}
executing += 1;
tx_index += 1;
}
// drop handle and wait for all tasks to finish and drop theirs
@@ -191,7 +286,7 @@ where
/// was cancelled.
pub(super) fn run(
self,
pending: mpsc::Receiver<impl ExecutableTxFor<Evm> + Send + 'static>,
pending: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
actions_tx: Sender<PrewarmTaskEvent>,
) {
// spawn execution tasks.
@@ -334,15 +429,17 @@ where
///
/// Note: There are no ordering guarantees; this does not reflect the state produced by
/// sequential execution.
fn transact_batch(
fn transact_batch<Tx>(
self,
txs: mpsc::Receiver<impl ExecutableTxFor<Evm>>,
txs: mpsc::Receiver<IndexedTransaction<Tx>>,
sender: Sender<PrewarmTaskEvent>,
done_tx: Sender<()>,
) {
) where
Tx: ExecutableTxFor<Evm>,
{
let Some((mut evm, metrics, terminate_execution)) = self.evm_for_ctx() else { return };
while let Ok(tx) = txs.recv() {
while let Ok(IndexedTransaction { index, tx }) = txs.recv() {
// If the task was cancelled, stop execution, send an empty result to notify the task,
// and exit.
if terminate_execution.load(Ordering::Relaxed) {
@@ -370,16 +467,40 @@ where
};
metrics.execution_duration.record(start.elapsed());
let (targets, storage_targets) = multiproof_targets_from_state(res.state);
metrics.prefetch_storage_targets.record(storage_targets as f64);
metrics.total_runtime.record(start.elapsed());
// Only send outcome for transactions after the first txn
// as the main execution will be just as fast
if index > 0 {
let (targets, storage_targets) = multiproof_targets_from_state(res.state);
metrics.prefetch_storage_targets.record(storage_targets as f64);
let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: Some(targets) });
}
let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: Some(targets) });
metrics.total_runtime.record(start.elapsed());
}
// send a message to the main task to flag that we're done
let _ = done_tx.send(());
}
/// Spawns a worker task for transaction execution and returns its sender channel.
fn spawn_worker<Tx>(
&self,
executor: &WorkloadExecutor,
actions_tx: Sender<PrewarmTaskEvent>,
done_tx: Sender<()>,
) -> mpsc::Sender<IndexedTransaction<Tx>>
where
Tx: ExecutableTxFor<Evm> + Clone + Send + 'static,
{
let (tx, rx) = mpsc::channel();
let ctx = self.clone();
executor.spawn_blocking(move || {
ctx.transact_batch(rx, actions_tx, done_tx);
});
tx
}
}
/// Returns a set of [`MultiProofTargets`] and the total amount of storage targets, based on the