diff --git a/crates/engine/primitives/src/config.rs b/crates/engine/primitives/src/config.rs index b34f368c7f..d5c5a98d30 100644 --- a/crates/engine/primitives/src/config.rs +++ b/crates/engine/primitives/src/config.rs @@ -42,18 +42,6 @@ pub const DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE_V2: usize = DEFAULT_MULTIPROOF_TASK /// This will be deducted from the thread count of main reth global threadpool. pub const DEFAULT_RESERVED_CPU_CORES: usize = 1; -/// Returns the default maximum concurrency for prewarm task based on available parallelism. -fn default_prewarm_max_concurrency() -> usize { - #[cfg(feature = "std")] - { - std::thread::available_parallelism().map_or(16, |n| n.get()) - } - #[cfg(not(feature = "std"))] - { - 16 - } -} - /// Default depth for sparse trie pruning. /// /// Nodes at this depth and below are converted to hash stubs to reduce memory. @@ -161,8 +149,6 @@ pub struct TreeConfig { /// where immediate payload regeneration is desired despite the head not changing or moving to /// an ancestor. always_process_payload_attributes_on_canonical_head: bool, - /// Maximum concurrency for the prewarm task. - prewarm_max_concurrency: usize, /// Whether to unwind canonical header to ancestor during forkchoice updates. allow_unwind_canonical_header: bool, /// Number of storage proof worker threads. @@ -209,7 +195,6 @@ impl Default for TreeConfig { precompile_cache_disabled: false, state_root_fallback: false, always_process_payload_attributes_on_canonical_head: false, - prewarm_max_concurrency: default_prewarm_max_concurrency(), allow_unwind_canonical_header: false, storage_worker_count: default_storage_worker_count(), account_worker_count: default_account_worker_count(), @@ -246,7 +231,6 @@ impl TreeConfig { precompile_cache_disabled: bool, state_root_fallback: bool, always_process_payload_attributes_on_canonical_head: bool, - prewarm_max_concurrency: usize, allow_unwind_canonical_header: bool, storage_worker_count: usize, account_worker_count: usize, @@ -275,7 +259,6 @@ impl TreeConfig { precompile_cache_disabled, state_root_fallback, always_process_payload_attributes_on_canonical_head, - prewarm_max_concurrency, allow_unwind_canonical_header, storage_worker_count, account_worker_count, @@ -533,17 +516,6 @@ impl TreeConfig { self.has_enough_parallelism && !self.legacy_state_root } - /// Setter for prewarm max concurrency. - pub const fn with_prewarm_max_concurrency(mut self, prewarm_max_concurrency: usize) -> Self { - self.prewarm_max_concurrency = prewarm_max_concurrency; - self - } - - /// Return the prewarm max concurrency. - pub const fn prewarm_max_concurrency(&self) -> usize { - self.prewarm_max_concurrency - } - /// Return the number of storage proof worker threads. pub const fn storage_worker_count(&self) -> usize { self.storage_worker_count diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index b326c26169..64ce3c4cf0 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -132,8 +132,6 @@ where /// re-use allocated memory. Stored with the block hash it was computed for to enable trie /// preservation across sequential payload validations. sparse_state_trie: SharedPreservedSparseTrie, - /// Maximum concurrency for prewarm task. - prewarm_max_concurrency: usize, /// Sparse trie prune depth. sparse_trie_prune_depth: usize, /// Maximum storage tries to retain after pruning. @@ -172,7 +170,6 @@ where precompile_cache_disabled: config.precompile_cache_disabled(), precompile_cache_map, sparse_state_trie: SharedPreservedSparseTrie::default(), - prewarm_max_concurrency: config.prewarm_max_concurrency(), sparse_trie_prune_depth: config.sparse_trie_prune_depth(), sparse_trie_max_storage_tries: config.sparse_trie_max_storage_tries(), disable_sparse_trie_cache_pruning: config.disable_sparse_trie_cache_pruning(), @@ -467,7 +464,6 @@ where self.execution_cache.clone(), prewarm_ctx, to_multi_proof, - self.prewarm_max_concurrency, ); { diff --git a/crates/engine/tree/src/tree/payload_processor/prewarm.rs b/crates/engine/tree/src/tree/payload_processor/prewarm.rs index 2b45acad2f..8881ef9742 100644 --- a/crates/engine/tree/src/tree/payload_processor/prewarm.rs +++ b/crates/engine/tree/src/tree/payload_processor/prewarm.rs @@ -14,7 +14,7 @@ use crate::tree::{ cached_state::{CachedStateProvider, SavedCache}, payload_processor::{ - bal::{self, total_slots, BALSlotIter}, + bal, multiproof::{MultiProofMessage, VersionedMultiProofTargets}, PayloadExecutionCache, }, @@ -25,9 +25,10 @@ use alloy_consensus::transaction::TxHashRef; use alloy_eip7928::BlockAccessList; use alloy_eips::eip4895::Withdrawal; use alloy_evm::Database; -use alloy_primitives::{keccak256, map::B256Set, B256}; +use alloy_primitives::{keccak256, map::B256Set, StorageKey, B256}; use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender}; use metrics::{Counter, Gauge, Histogram}; +use rayon::prelude::*; use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, RecoveredTx, SpecFor}; use reth_metrics::Metrics; use reth_primitives_traits::NodePrimitives; @@ -39,7 +40,6 @@ use reth_revm::{database::StateProviderDatabase, state::EvmState}; use reth_tasks::Runtime; use reth_trie::MultiProofTargets; use std::{ - ops::Range, sync::{ atomic::{AtomicBool, Ordering}, mpsc::{self, channel, Receiver, Sender, SyncSender}, @@ -86,8 +86,6 @@ where execution_cache: PayloadExecutionCache, /// Context provided to execution tasks ctx: PrewarmContext, - /// How many transactions should be executed in parallel - max_concurrency: usize, /// Sender to emit evm state outcome messages, if any. to_multi_proof: Option>, /// Receiver for events produced by tx execution @@ -108,13 +106,12 @@ where execution_cache: PayloadExecutionCache, ctx: PrewarmContext, to_multi_proof: Option>, - max_concurrency: usize, ) -> (Self, Sender>) { let (actions_tx, actions_rx) = channel(); trace!( target: "engine::tree::payload_processor::prewarm", - max_concurrency, + prewarming_threads = executor.prewarming_pool().current_num_threads(), transaction_count = ctx.env.transaction_count, "Initialized prewarm task" ); @@ -124,7 +121,6 @@ where executor, execution_cache, ctx, - max_concurrency, to_multi_proof, actions_rx, parent_span: Span::current(), @@ -148,20 +144,18 @@ where { let executor = self.executor.clone(); let ctx = self.ctx.clone(); - let max_concurrency = self.max_concurrency; let span = Span::current(); self.executor.spawn_blocking(move || { let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", parent: span, "spawn_all").entered(); - // When transaction_count is 0, it means the count is unknown. In this case, spawn - // max workers to handle potentially many transactions in parallel rather - // than bottlenecking on a single worker. - let transaction_count = ctx.env.transaction_count; - let workers_needed = if transaction_count == 0 { - max_concurrency + let pool_threads = executor.prewarming_pool().current_num_threads(); + // Don't spawn more workers than transactions. When transaction_count is 0 + // (unknown), use all pool threads. + let workers_needed = if ctx.env.transaction_count > 0 { + ctx.env.transaction_count.min(pool_threads) } else { - transaction_count.min(max_concurrency) + pool_threads }; let (done_tx, done_rx) = mpsc::sync_channel(workers_needed); @@ -274,10 +268,8 @@ where } } - /// Runs BAL-based prewarming by spawning workers to prefetch storage slots. - /// - /// Divides the total slots across `max_concurrency` workers, each responsible for - /// prefetching a range of slots from the BAL. + /// Runs BAL-based prewarming by using the prewarming pool's parallel iterator to prefetch + /// accounts and storage slots. #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)] fn run_bal_prewarm( &self, @@ -296,59 +288,35 @@ where return; } - let total_slots = total_slots(&bal); - - trace!( - target: "engine::tree::payload_processor::prewarm", - total_slots, - max_concurrency = self.max_concurrency, - "Starting BAL prewarm" - ); - - if total_slots == 0 { + if bal.is_empty() { self.send_bal_hashed_state(&bal); let _ = actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 }); return; } - // Calculate number of workers needed (at most max_concurrency) - let workers_needed = total_slots.min(self.max_concurrency); + trace!( + target: "engine::tree::payload_processor::prewarm", + accounts = bal.len(), + "Starting BAL prewarm" + ); - let (done_tx, done_rx) = mpsc::sync_channel(workers_needed); - - // Calculate slots per worker - let slots_per_worker = total_slots / workers_needed; - let remainder = total_slots % workers_needed; - - // Spawn workers with their assigned ranges - for i in 0..workers_needed { - let start = i * slots_per_worker + i.min(remainder); - let extra = if i < remainder { 1 } else { 0 }; - let end = start + slots_per_worker + extra; - - self.ctx.spawn_bal_worker( - i, - &self.executor, - Arc::clone(&bal), - start..end, - done_tx.clone(), + let ctx = self.ctx.clone(); + self.executor.prewarming_pool().install(|| { + bal.par_iter().for_each_init( + || (ctx.clone(), None::>), + |(ctx, provider), account| { + if ctx.terminate_execution.load(Ordering::Relaxed) { + return; + } + ctx.prefetch_bal_account(provider, account); + }, ); - } - - // Drop our handle to done_tx so we can detect completion - drop(done_tx); - - // Wait for all workers to complete - let mut completed_workers = 0; - while done_rx.recv().is_ok() { - completed_workers += 1; - } + }); trace!( target: "engine::tree::payload_processor::prewarm", - completed_workers, - "All BAL prewarm workers completed" + "All BAL prewarm accounts completed" ); // Convert BAL to HashedPostState and send to multiproof task @@ -668,115 +636,65 @@ where let (tx_sender, tx_receiver) = crossbeam_channel::unbounded(); // Spawn workers that all pull from the shared receiver - let executor = task_executor.clone(); let span = Span::current(); - task_executor.spawn_blocking(move || { - let _enter = span.entered(); - for idx in 0..workers_needed { - let ctx = self.clone(); - let to_multi_proof = to_multi_proof.clone(); - let done_tx = done_tx.clone(); - let rx = tx_receiver.clone(); - let span = debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm worker", idx); - executor.spawn_blocking(move || { - let _enter = span.entered(); - ctx.transact_batch(rx, to_multi_proof, done_tx); - }); - } - }); + for idx in 0..workers_needed { + let ctx = self.clone(); + let to_multi_proof = to_multi_proof.clone(); + let done_tx = done_tx.clone(); + let rx = tx_receiver.clone(); + let span = debug_span!(target: "engine::tree::payload_processor::prewarm", parent: &span, "prewarm worker", idx); + task_executor.prewarming_pool().spawn(move || { + let _enter = span.entered(); + ctx.transact_batch(rx, to_multi_proof, done_tx); + }); + } tx_sender } - /// Spawns a worker task for BAL slot prefetching. + /// Prefetches a single account and all its storage slots from the BAL into the cache. /// - /// The worker iterates over the specified range of slots in the BAL and ensures - /// each slot is loaded into the cache by accessing it through the state provider. - fn spawn_bal_worker( + /// The `provider` is lazily initialized on first call and reused across accounts on the same + /// thread. + fn prefetch_bal_account( &self, - idx: usize, - executor: &Runtime, - bal: Arc, - range: Range, - done_tx: SyncSender<()>, + provider: &mut Option>, + account: &alloy_eip7928::AccountChanges, ) { - let ctx = self.clone(); - let span = debug_span!( - target: "engine::tree::payload_processor::prewarm", - "bal prewarm worker", - idx, - range_start = range.start, - range_end = range.end - ); - - executor.spawn_blocking(move || { - let _enter = span.entered(); - ctx.prefetch_bal_slots(bal, range, done_tx); - }); - } - - /// Prefetches storage slots from a BAL range into the cache. - /// - /// This iterates through the specified range of slots and accesses them via the state - /// provider to populate the cache. - #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)] - fn prefetch_bal_slots( - self, - bal: Arc, - range: Range, - done_tx: SyncSender<()>, - ) { - let Self { saved_cache, provider, metrics, .. } = self; - - // Build state provider - let state_provider = match provider.build() { - Ok(provider) => provider, - Err(err) => { - trace!( - target: "engine::tree::payload_processor::prewarm", - %err, - "Failed to build state provider in BAL prewarm thread" - ); - let _ = done_tx.send(()); - return; + let state_provider = match provider { + Some(p) => p, + slot @ None => { + let built = match self.provider.build() { + Ok(p) => p, + Err(err) => { + trace!( + target: "engine::tree::payload_processor::prewarm", + %err, + "Failed to build state provider in BAL prewarm thread" + ); + return; + } + }; + let saved_cache = + self.saved_cache.as_ref().expect("BAL prewarm should only run with cache"); + let caches = saved_cache.cache().clone(); + let cache_metrics = saved_cache.metrics().clone(); + slot.insert(CachedStateProvider::new(built, caches, cache_metrics)) } }; - // Wrap with cache (guaranteed to be Some since run_bal_prewarm checks) - let saved_cache = saved_cache.expect("BAL prewarm should only run with cache"); - let caches = saved_cache.cache().clone(); - let cache_metrics = saved_cache.metrics().clone(); - let state_provider = CachedStateProvider::new(state_provider, caches, cache_metrics); - let start = Instant::now(); - // Track last seen address to avoid fetching the same account multiple times. - let mut last_address = None; + let _ = state_provider.basic_account(&account.address); - // Iterate through the assigned range of slots - for (address, slot) in BALSlotIter::new(&bal, range.clone()) { - // Fetch the account if this is a different address than the last one - if last_address != Some(address) { - let _ = state_provider.basic_account(&address); - last_address = Some(address); - } - - // Access the slot to populate the cache - let _ = state_provider.storage(address, slot); + for slot in &account.storage_changes { + let _ = state_provider.storage(account.address, StorageKey::from(slot.slot)); + } + for &slot in &account.storage_reads { + let _ = state_provider.storage(account.address, StorageKey::from(slot)); } - let elapsed = start.elapsed(); - - trace!( - target: "engine::tree::payload_processor::prewarm", - ?range, - elapsed_ms = elapsed.as_millis(), - "BAL prewarm worker completed" - ); - - // Signal completion - let _ = done_tx.send(()); - metrics.bal_slot_iteration_duration.record(elapsed.as_secs_f64()); + self.metrics.bal_slot_iteration_duration.record(start.elapsed().as_secs_f64()); } } diff --git a/crates/ethereum/cli/src/app.rs b/crates/ethereum/cli/src/app.rs index 7cb4ec78f1..71cd2869e9 100644 --- a/crates/ethereum/cli/src/app.rs +++ b/crates/ethereum/cli/src/app.rs @@ -158,6 +158,7 @@ where reserved_cpu_cores: command.engine.reserved_cpu_cores, proof_storage_worker_threads: command.engine.storage_worker_count, proof_account_worker_threads: command.engine.account_worker_count, + prewarming_threads: command.engine.prewarming_threads, ..Default::default() }; let runner = CliRunner::try_with_runtime_config( diff --git a/crates/node/core/src/args/engine.rs b/crates/node/core/src/args/engine.rs index cc747c5bef..7964b0ff20 100644 --- a/crates/node/core/src/args/engine.rs +++ b/crates/node/core/src/args/engine.rs @@ -38,6 +38,7 @@ pub struct DefaultEngineValues { allow_unwind_canonical_header: bool, storage_worker_count: Option, account_worker_count: Option, + prewarming_threads: Option, disable_proof_v2: bool, cache_metrics_disabled: bool, disable_trie_cache: bool, @@ -169,6 +170,12 @@ impl DefaultEngineValues { self } + /// Set the default prewarming thread count + pub const fn with_prewarming_threads(mut self, v: Option) -> Self { + self.prewarming_threads = v; + self + } + /// Set whether to disable proof V2 by default pub const fn with_disable_proof_v2(mut self, v: bool) -> Self { self.disable_proof_v2 = v; @@ -233,6 +240,7 @@ impl Default for DefaultEngineValues { allow_unwind_canonical_header: false, storage_worker_count: None, account_worker_count: None, + prewarming_threads: None, disable_proof_v2: false, cache_metrics_disabled: false, disable_trie_cache: false, @@ -360,6 +368,11 @@ pub struct EngineArgs { #[arg(long = "engine.account-worker-count", default_value = Resettable::from(DefaultEngineValues::get_global().account_worker_count.map(|v| v.to_string().into())))] pub account_worker_count: Option, + /// Configure the number of prewarming threads. + /// If not specified, defaults to available parallelism. + #[arg(long = "engine.prewarming-threads", default_value = Resettable::from(DefaultEngineValues::get_global().prewarming_threads.map(|v| v.to_string().into())))] + pub prewarming_threads: Option, + /// Disable V2 storage proofs for state root calculations #[arg(long = "engine.disable-proof-v2", default_value_t = DefaultEngineValues::get_global().disable_proof_v2)] pub disable_proof_v2: bool, @@ -424,6 +437,7 @@ impl Default for EngineArgs { allow_unwind_canonical_header, storage_worker_count, account_worker_count, + prewarming_threads, disable_proof_v2, cache_metrics_disabled, disable_trie_cache, @@ -455,6 +469,7 @@ impl Default for EngineArgs { allow_unwind_canonical_header, storage_worker_count, account_worker_count, + prewarming_threads, disable_proof_v2, cache_metrics_disabled, disable_trie_cache, @@ -546,6 +561,7 @@ mod tests { allow_unwind_canonical_header: true, storage_worker_count: Some(16), account_worker_count: Some(8), + prewarming_threads: Some(4), disable_proof_v2: false, cache_metrics_disabled: true, disable_trie_cache: true, @@ -582,6 +598,8 @@ mod tests { "16", "--engine.account-worker-count", "8", + "--engine.prewarming-threads", + "4", "--engine.disable-cache-metrics", "--engine.disable-trie-cache", "--engine.sparse-trie-prune-depth", diff --git a/crates/tasks/src/runtime.rs b/crates/tasks/src/runtime.rs index fafd505332..ffe02957e3 100644 --- a/crates/tasks/src/runtime.rs +++ b/crates/tasks/src/runtime.rs @@ -110,6 +110,9 @@ pub struct RayonConfig { /// Number of threads for the proof account worker pool (trie account proof workers). /// If `None`, derived from available parallelism. pub proof_account_worker_threads: Option, + /// Number of threads for the prewarming pool (execution prewarming workers). + /// If `None`, derived from available parallelism. + pub prewarming_threads: Option, } #[cfg(feature = "rayon")] @@ -123,6 +126,7 @@ impl Default for RayonConfig { max_blocking_tasks: DEFAULT_MAX_BLOCKING_TASKS, proof_storage_worker_threads: None, proof_account_worker_threads: None, + prewarming_threads: None, } } } @@ -171,6 +175,12 @@ impl RayonConfig { self } + /// Set the number of threads for the prewarming pool. + pub const fn with_prewarming_threads(mut self, prewarming_threads: usize) -> Self { + self.prewarming_threads = Some(prewarming_threads); + self + } + /// Compute the default number of threads based on available parallelism. fn default_thread_count(&self) -> usize { self.cpu_threads.unwrap_or_else(|| { @@ -260,6 +270,9 @@ struct RuntimeInner { /// Proof account worker pool (trie account proof computation). #[cfg(feature = "rayon")] proof_account_worker_pool: rayon::ThreadPool, + /// Prewarming pool (execution prewarming workers). + #[cfg(feature = "rayon")] + prewarming_pool: rayon::ThreadPool, /// Handle to the spawned [`TaskManager`] background task. /// The task monitors critical tasks for panics and fires the shutdown signal. /// Can be taken via [`Runtime::take_task_manager_handle`] to poll for panic errors. @@ -355,6 +368,12 @@ impl Runtime { pub fn proof_account_worker_pool(&self) -> &rayon::ThreadPool { &self.0.proof_account_worker_pool } + + /// Get the prewarming pool. + #[cfg(feature = "rayon")] + pub fn prewarming_pool(&self) -> &rayon::ThreadPool { + &self.0.prewarming_pool + } } // ── Test helpers ────────────────────────────────────────────────────── @@ -394,6 +413,7 @@ impl Runtime { max_blocking_tasks: 16, proof_storage_worker_threads: Some(2), proof_account_worker_threads: Some(2), + prewarming_threads: Some(2), }, } } @@ -824,6 +844,7 @@ impl RuntimeBuilder { blocking_guard, proof_storage_worker_pool, proof_account_worker_pool, + prewarming_pool, ) = { let default_threads = config.rayon.default_thread_count(); let rpc_threads = config.rayon.rpc_threads.unwrap_or(default_threads); @@ -862,12 +883,19 @@ impl RuntimeBuilder { .thread_name(|i| format!("proof-acct-{i:02}")) .build()?; + let prewarming_threads = config.rayon.prewarming_threads.unwrap_or(default_threads); + let prewarming_pool = rayon::ThreadPoolBuilder::new() + .num_threads(prewarming_threads) + .thread_name(|i| format!("prewarm-{i:02}")) + .build()?; + debug!( default_threads, rpc_threads, storage_threads, proof_storage_worker_threads, proof_account_worker_threads, + prewarming_threads, max_blocking_tasks = config.rayon.max_blocking_tasks, "Initialized rayon thread pools" ); @@ -879,6 +907,7 @@ impl RuntimeBuilder { blocking_guard, proof_storage_worker_pool, proof_account_worker_pool, + prewarming_pool, ) }; @@ -909,6 +938,8 @@ impl RuntimeBuilder { proof_storage_worker_pool, #[cfg(feature = "rayon")] proof_account_worker_pool, + #[cfg(feature = "rayon")] + prewarming_pool, task_manager_handle: Mutex::new(Some(task_manager_handle)), }; diff --git a/docs/vocs/docs/pages/cli/reth/node.mdx b/docs/vocs/docs/pages/cli/reth/node.mdx index 601e543f10..67fa40bca6 100644 --- a/docs/vocs/docs/pages/cli/reth/node.mdx +++ b/docs/vocs/docs/pages/cli/reth/node.mdx @@ -981,6 +981,9 @@ Engine: --engine.account-worker-count Configure the number of account proof workers in the Tokio blocking pool. If not specified, defaults to the same count as storage workers + --engine.prewarming-threads + Configure the number of prewarming threads. If not specified, defaults to available parallelism + --engine.disable-proof-v2 Disable V2 storage proofs for state root calculations