diff --git a/Cargo.lock b/Cargo.lock index ce6edea11e..8f609a9b74 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8659,6 +8659,7 @@ dependencies = [ "reth-node-ethereum", "reth-node-metrics", "reth-rpc-server-types", + "reth-tasks", "reth-tracing", "tempfile", "tracing", diff --git a/crates/cli/runner/src/lib.rs b/crates/cli/runner/src/lib.rs index c60ce2eb4e..7c359d421b 100644 --- a/crates/cli/runner/src/lib.rs +++ b/crates/cli/runner/src/lib.rs @@ -30,8 +30,14 @@ impl CliRunner { /// /// The default runtime is multi-threaded, with both I/O and time drivers enabled. pub fn try_default_runtime() -> Result { - let runtime = - reth_tasks::RuntimeBuilder::new(reth_tasks::RuntimeConfig::default()).build()?; + Self::try_with_runtime_config(reth_tasks::RuntimeConfig::default()) + } + + /// Creates a new [`CliRunner`] with the given [`RuntimeConfig`](reth_tasks::RuntimeConfig). + pub fn try_with_runtime_config( + config: reth_tasks::RuntimeConfig, + ) -> Result { + let runtime = reth_tasks::RuntimeBuilder::new(config).build()?; Ok(Self { config: CliRunnerConfig::default(), runtime }) } diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index 9306288b5c..00407f3def 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -287,15 +287,7 @@ where // Create and spawn the storage proof task let task_ctx = ProofTaskCtx::new(multiproof_provider_factory); - let storage_worker_count = config.storage_worker_count(); - let account_worker_count = config.account_worker_count(); - let proof_handle = ProofWorkerHandle::new( - &self.executor, - task_ctx, - storage_worker_count, - account_worker_count, - v2_proofs_enabled, - ); + let proof_handle = ProofWorkerHandle::new(&self.executor, task_ctx, v2_proofs_enabled); if config.disable_trie_cache() { let multi_proof_task = MultiProofTask::new( diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs index a1ae2e7c26..b4b896df4c 100644 --- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -1571,7 +1571,7 @@ mod tests { let changeset_cache = ChangesetCache::new(); let overlay_factory = OverlayStateProviderFactory::new(factory, changeset_cache); let task_ctx = ProofTaskCtx::new(overlay_factory); - let proof_handle = ProofWorkerHandle::new(runtime, task_ctx, 1, 1, false); + let proof_handle = ProofWorkerHandle::new(runtime, task_ctx, false); let (to_sparse_trie, _receiver) = std::sync::mpsc::channel(); let (tx, rx) = crossbeam_channel::unbounded(); diff --git a/crates/ethereum/cli/Cargo.toml b/crates/ethereum/cli/Cargo.toml index a7b52fb94a..bb31b0a70c 100644 --- a/crates/ethereum/cli/Cargo.toml +++ b/crates/ethereum/cli/Cargo.toml @@ -22,6 +22,7 @@ reth-node-core.workspace = true reth-node-ethereum.workspace = true reth-node-metrics.workspace = true reth-rpc-server-types.workspace = true +reth-tasks.workspace = true reth-tracing.workspace = true reth-node-api.workspace = true diff --git a/crates/ethereum/cli/src/app.rs b/crates/ethereum/cli/src/app.rs index 2c08b57a80..7cb4ec78f1 100644 --- a/crates/ethereum/cli/src/app.rs +++ b/crates/ethereum/cli/src/app.rs @@ -17,6 +17,7 @@ use reth_node_builder::{NodeBuilder, WithLaunchContext}; use reth_node_ethereum::{consensus::EthBeaconConsensus, EthEvmConfig, EthereumNode}; use reth_node_metrics::recorder::install_prometheus_recorder; use reth_rpc_server_types::RpcModuleValidator; +use reth_tasks::RayonConfig; use reth_tracing::{FileWorkerGuard, Layers}; use std::{fmt, sync::Arc}; @@ -153,6 +154,16 @@ where Rpc::validate_selection(ws_api, "ws.api").map_err(|e| eyre!("{e}"))?; } + let rayon_config = RayonConfig { + reserved_cpu_cores: command.engine.reserved_cpu_cores, + proof_storage_worker_threads: command.engine.storage_worker_count, + proof_account_worker_threads: command.engine.account_worker_count, + ..Default::default() + }; + let runner = CliRunner::try_with_runtime_config( + reth_tasks::RuntimeConfig::default().with_rayon(rayon_config), + )?; + runner.run_command_until_exit(|ctx| { command.execute(ctx, FnLauncher::new::(launcher)) }) diff --git a/crates/tasks/src/runtime.rs b/crates/tasks/src/runtime.rs index 5856b2bb39..3616cd10b6 100644 --- a/crates/tasks/src/runtime.rs +++ b/crates/tasks/src/runtime.rs @@ -99,14 +99,17 @@ pub struct RayonConfig { /// Number of threads for the RPC blocking pool (trace calls, `eth_getProof`, etc.). /// If `None`, uses the same as `cpu_threads`. pub rpc_threads: Option, - /// Number of threads for the trie proof computation pool. - /// If `None`, uses the same as `cpu_threads`. - pub trie_threads: Option, /// Number of threads for the storage I/O pool (static file, `RocksDB` writes in /// `save_blocks`). If `None`, uses [`DEFAULT_STORAGE_POOL_THREADS`]. pub storage_threads: Option, /// Maximum number of concurrent blocking tasks for the RPC guard semaphore. pub max_blocking_tasks: usize, + /// Number of threads for the proof storage worker pool (trie storage proof workers). + /// If `None`, derived from available parallelism. + pub proof_storage_worker_threads: Option, + /// Number of threads for the proof account worker pool (trie account proof workers). + /// If `None`, derived from available parallelism. + pub proof_account_worker_threads: Option, } #[cfg(feature = "rayon")] @@ -116,9 +119,10 @@ impl Default for RayonConfig { cpu_threads: None, reserved_cpu_cores: DEFAULT_RESERVED_CPU_CORES, rpc_threads: None, - trie_threads: None, storage_threads: None, max_blocking_tasks: DEFAULT_MAX_BLOCKING_TASKS, + proof_storage_worker_threads: None, + proof_account_worker_threads: None, } } } @@ -143,18 +147,30 @@ impl RayonConfig { self } - /// Set the number of threads for the trie proof pool. - pub const fn with_trie_threads(mut self, trie_threads: usize) -> Self { - self.trie_threads = Some(trie_threads); - self - } - /// Set the number of threads for the storage I/O pool. pub const fn with_storage_threads(mut self, storage_threads: usize) -> Self { self.storage_threads = Some(storage_threads); self } + /// Set the number of threads for the proof storage worker pool. + pub const fn with_proof_storage_worker_threads( + mut self, + proof_storage_worker_threads: usize, + ) -> Self { + self.proof_storage_worker_threads = Some(proof_storage_worker_threads); + self + } + + /// Set the number of threads for the proof account worker pool. + pub const fn with_proof_account_worker_threads( + mut self, + proof_account_worker_threads: usize, + ) -> Self { + self.proof_account_worker_threads = Some(proof_account_worker_threads); + self + } + /// Compute the default number of threads based on available parallelism. fn default_thread_count(&self) -> usize { self.cpu_threads.unwrap_or_else(|| { @@ -232,15 +248,18 @@ struct RuntimeInner { /// RPC blocking pool. #[cfg(feature = "rayon")] rpc_pool: BlockingTaskPool, - /// Trie proof computation pool. - #[cfg(feature = "rayon")] - trie_pool: rayon::ThreadPool, /// Storage I/O pool. #[cfg(feature = "rayon")] storage_pool: rayon::ThreadPool, /// Rate limiter for expensive RPC operations. #[cfg(feature = "rayon")] blocking_guard: BlockingTaskGuard, + /// Proof storage worker pool (trie storage proof computation). + #[cfg(feature = "rayon")] + proof_storage_worker_pool: rayon::ThreadPool, + /// Proof account worker pool (trie account proof computation). + #[cfg(feature = "rayon")] + proof_account_worker_pool: rayon::ThreadPool, /// Handle to the spawned [`TaskManager`] background task. /// The task monitors critical tasks for panics and fires the shutdown signal. /// Can be taken via [`Runtime::take_task_manager_handle`] to poll for panic errors. @@ -313,12 +332,6 @@ impl Runtime { &self.0.rpc_pool } - /// Get the trie proof computation pool. - #[cfg(feature = "rayon")] - pub fn trie_pool(&self) -> &rayon::ThreadPool { - &self.0.trie_pool - } - /// Get the storage I/O pool. #[cfg(feature = "rayon")] pub fn storage_pool(&self) -> &rayon::ThreadPool { @@ -331,34 +344,16 @@ impl Runtime { self.0.blocking_guard.clone() } - /// Run a closure on the CPU pool, blocking the current thread until completion. + /// Get the proof storage worker pool. #[cfg(feature = "rayon")] - pub fn install_cpu(&self, f: F) -> R - where - F: FnOnce() -> R + Send, - R: Send, - { - self.cpu_pool().install(f) + pub fn proof_storage_worker_pool(&self) -> &rayon::ThreadPool { + &self.0.proof_storage_worker_pool } - /// Spawn a CPU-bound task on the RPC pool and return an async handle. + /// Get the proof account worker pool. #[cfg(feature = "rayon")] - pub fn spawn_rpc(&self, f: F) -> crate::pool::BlockingTaskHandle - where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static, - { - self.rpc_pool().spawn(f) - } - - /// Run a closure on the trie pool, blocking the current thread until completion. - #[cfg(feature = "rayon")] - pub fn install_trie(&self, f: F) -> R - where - F: FnOnce() -> R + Send, - R: Send, - { - self.trie_pool().install(f) + pub fn proof_account_worker_pool(&self) -> &rayon::ThreadPool { + &self.0.proof_account_worker_pool } } @@ -395,9 +390,10 @@ impl Runtime { cpu_threads: Some(2), reserved_cpu_cores: 0, rpc_threads: Some(2), - trie_threads: Some(2), storage_threads: Some(2), max_blocking_tasks: 16, + proof_storage_worker_threads: Some(2), + proof_account_worker_threads: Some(2), }, } } @@ -821,46 +817,69 @@ impl RuntimeBuilder { TaskManager::new_parts(handle.clone()); #[cfg(feature = "rayon")] - let (cpu_pool, rpc_pool, trie_pool, storage_pool, blocking_guard) = { + let ( + cpu_pool, + rpc_pool, + storage_pool, + blocking_guard, + proof_storage_worker_pool, + proof_account_worker_pool, + ) = { let default_threads = config.rayon.default_thread_count(); let rpc_threads = config.rayon.rpc_threads.unwrap_or(default_threads); - let trie_threads = config.rayon.trie_threads.unwrap_or(default_threads); let cpu_pool = rayon::ThreadPoolBuilder::new() .num_threads(default_threads) - .thread_name(|i| format!("reth-cpu-{i}")) + .thread_name(|i| format!("cpu-{i}")) .build()?; let rpc_raw = rayon::ThreadPoolBuilder::new() .num_threads(rpc_threads) - .thread_name(|i| format!("reth-rpc-{i}")) + .thread_name(|i| format!("rpc-{i}")) .build()?; let rpc_pool = BlockingTaskPool::new(rpc_raw); - let trie_pool = rayon::ThreadPoolBuilder::new() - .num_threads(trie_threads) - .thread_name(|i| format!("reth-trie-{i}")) - .build()?; - let storage_threads = config.rayon.storage_threads.unwrap_or(DEFAULT_STORAGE_POOL_THREADS); let storage_pool = rayon::ThreadPoolBuilder::new() .num_threads(storage_threads) - .thread_name(|i| format!("reth-storage-{i}")) + .thread_name(|i| format!("storage-{i}")) .build()?; let blocking_guard = BlockingTaskGuard::new(config.rayon.max_blocking_tasks); + let proof_storage_worker_threads = + config.rayon.proof_storage_worker_threads.unwrap_or(default_threads); + let proof_storage_worker_pool = rayon::ThreadPoolBuilder::new() + .num_threads(proof_storage_worker_threads) + .thread_name(|i| format!("proof-strg-{i}")) + .build()?; + + let proof_account_worker_threads = + config.rayon.proof_account_worker_threads.unwrap_or(default_threads); + let proof_account_worker_pool = rayon::ThreadPoolBuilder::new() + .num_threads(proof_account_worker_threads) + .thread_name(|i| format!("proof-acct-{i}")) + .build()?; + debug!( default_threads, rpc_threads, - trie_threads, storage_threads, + proof_storage_worker_threads, + proof_account_worker_threads, max_blocking_tasks = config.rayon.max_blocking_tasks, "Initialized rayon thread pools" ); - (cpu_pool, rpc_pool, trie_pool, storage_pool, blocking_guard) + ( + cpu_pool, + rpc_pool, + storage_pool, + blocking_guard, + proof_storage_worker_pool, + proof_account_worker_pool, + ) }; let task_manager_handle = handle.spawn(async move { @@ -883,11 +902,13 @@ impl RuntimeBuilder { #[cfg(feature = "rayon")] rpc_pool, #[cfg(feature = "rayon")] - trie_pool, - #[cfg(feature = "rayon")] storage_pool, #[cfg(feature = "rayon")] blocking_guard, + #[cfg(feature = "rayon")] + proof_storage_worker_pool, + #[cfg(feature = "rayon")] + proof_account_worker_pool, task_manager_handle: Mutex::new(Some(task_manager_handle)), }; diff --git a/crates/trie/parallel/src/proof.rs b/crates/trie/parallel/src/proof.rs index 5c34d0367e..81ed3f9188 100644 --- a/crates/trie/parallel/src/proof.rs +++ b/crates/trie/parallel/src/proof.rs @@ -325,7 +325,7 @@ mod tests { reth_provider::providers::OverlayStateProviderFactory::new(factory, changeset_cache); let task_ctx = ProofTaskCtx::new(factory); let runtime = reth_tasks::Runtime::test(); - let proof_worker_handle = ProofWorkerHandle::new(&runtime, task_ctx, 1, 1, false); + let proof_worker_handle = ProofWorkerHandle::new(&runtime, task_ctx, false); let parallel_result = ParallelProof::new(Default::default(), proof_worker_handle.clone()) .decoded_multiproof(targets.clone()) diff --git a/crates/trie/parallel/src/proof_task.rs b/crates/trie/parallel/src/proof_task.rs index 6ea3910d65..990c17117e 100644 --- a/crates/trie/parallel/src/proof_task.rs +++ b/crates/trie/parallel/src/proof_task.rs @@ -134,14 +134,10 @@ impl ProofWorkerHandle { /// # Parameters /// - `runtime`: The centralized runtime used to spawn blocking worker tasks /// - `task_ctx`: Shared context with database view and prefix sets - /// - `storage_worker_count`: Number of storage workers to spawn - /// - `account_worker_count`: Number of account workers to spawn /// - `v2_proofs_enabled`: Whether to enable V2 storage proofs pub fn new( runtime: &Runtime, task_ctx: ProofTaskCtx, - storage_worker_count: usize, - account_worker_count: usize, v2_proofs_enabled: bool, ) -> Self where @@ -153,13 +149,14 @@ impl ProofWorkerHandle { let (storage_work_tx, storage_work_rx) = unbounded::(); let (account_work_tx, account_work_rx) = unbounded::(); - // Initialize availability counters at zero. Each worker will increment when it - // successfully initializes, ensuring only healthy workers are counted. let storage_available_workers = Arc::::default(); let account_available_workers = Arc::::default(); let cached_storage_roots = Arc::>::default(); + let storage_worker_count = runtime.proof_storage_worker_pool().current_num_threads(); + let account_worker_count = runtime.proof_account_worker_pool().current_num_threads(); + debug!( target: "trie::proof_task", storage_worker_count, @@ -168,113 +165,97 @@ impl ProofWorkerHandle { "Spawning proof worker pools" ); - let this = Self { - storage_work_tx: storage_work_tx.clone(), + let storage_pool = runtime.proof_storage_worker_pool(); + let task_ctx_for_storage = task_ctx.clone(); + let cached_storage_roots_for_storage = cached_storage_roots.clone(); + + for worker_id in 0..storage_worker_count { + let span = debug_span!(target: "trie::proof_task", "storage worker", ?worker_id); + let task_ctx_clone = task_ctx_for_storage.clone(); + let work_rx_clone = storage_work_rx.clone(); + let storage_available_workers_clone = storage_available_workers.clone(); + let cached_storage_roots = cached_storage_roots_for_storage.clone(); + + storage_pool.spawn(move || { + #[cfg(feature = "metrics")] + let metrics = ProofTaskTrieMetrics::default(); + #[cfg(feature = "metrics")] + let cursor_metrics = ProofTaskCursorMetrics::new(); + + let _guard = span.enter(); + let worker = StorageProofWorker::new( + task_ctx_clone, + work_rx_clone, + worker_id, + storage_available_workers_clone, + cached_storage_roots, + #[cfg(feature = "metrics")] + metrics, + #[cfg(feature = "metrics")] + cursor_metrics, + ) + .with_v2_proofs(v2_proofs_enabled); + if let Err(error) = worker.run() { + error!( + target: "trie::proof_task", + worker_id, + ?error, + "Storage worker failed" + ); + } + }); + } + + let account_pool = runtime.proof_account_worker_pool(); + + for worker_id in 0..account_worker_count { + let span = debug_span!(target: "trie::proof_task", "account worker", ?worker_id); + let task_ctx_clone = task_ctx.clone(); + let work_rx_clone = account_work_rx.clone(); + let storage_work_tx_clone = storage_work_tx.clone(); + let account_available_workers_clone = account_available_workers.clone(); + let cached_storage_roots = cached_storage_roots.clone(); + + account_pool.spawn(move || { + #[cfg(feature = "metrics")] + let metrics = ProofTaskTrieMetrics::default(); + #[cfg(feature = "metrics")] + let cursor_metrics = ProofTaskCursorMetrics::new(); + + let _guard = span.enter(); + let worker = AccountProofWorker::new( + task_ctx_clone, + work_rx_clone, + worker_id, + storage_work_tx_clone, + account_available_workers_clone, + cached_storage_roots, + #[cfg(feature = "metrics")] + metrics, + #[cfg(feature = "metrics")] + cursor_metrics, + ) + .with_v2_proofs(v2_proofs_enabled); + if let Err(error) = worker.run() { + error!( + target: "trie::proof_task", + worker_id, + ?error, + "Account worker failed" + ); + } + }); + } + + Self { + storage_work_tx, account_work_tx, - storage_available_workers: storage_available_workers.clone(), - account_available_workers: account_available_workers.clone(), + storage_available_workers, + account_available_workers, storage_worker_count, account_worker_count, v2_proofs_enabled, - }; - - let executor = runtime.handle().clone(); - - let task_ctx_for_storage = task_ctx.clone(); - let executor_for_storage = executor.clone(); - let cached_storage_roots_for_storage = cached_storage_roots.clone(); - - executor_for_storage.clone().spawn_blocking(move || { - let parent_span = - debug_span!(target: "trie::proof_task", "storage proof workers", ?storage_worker_count) - .entered(); - // Spawn storage workers - for worker_id in 0..storage_worker_count { - let span = debug_span!(target: "trie::proof_task", "storage worker", ?worker_id); - let task_ctx_clone = task_ctx_for_storage.clone(); - let work_rx_clone = storage_work_rx.clone(); - let storage_available_workers_clone = storage_available_workers.clone(); - let cached_storage_roots = cached_storage_roots_for_storage.clone(); - - executor_for_storage.spawn_blocking(move || { - #[cfg(feature = "metrics")] - let metrics = ProofTaskTrieMetrics::default(); - #[cfg(feature = "metrics")] - let cursor_metrics = ProofTaskCursorMetrics::new(); - - let _guard = span.enter(); - let worker = StorageProofWorker::new( - task_ctx_clone, - work_rx_clone, - worker_id, - storage_available_workers_clone, - cached_storage_roots, - #[cfg(feature = "metrics")] - metrics, - #[cfg(feature = "metrics")] - cursor_metrics, - ) - .with_v2_proofs(v2_proofs_enabled); - if let Err(error) = worker.run() { - error!( - target: "trie::proof_task", - worker_id, - ?error, - "Storage worker failed" - ); - } - }); - } - drop(parent_span); - }); - - executor.clone().spawn_blocking(move || { - let parent_span = - debug_span!(target: "trie::proof_task", "account proof workers", ?account_worker_count) - .entered(); - // Spawn account workers - for worker_id in 0..account_worker_count { - let span = debug_span!(target: "trie::proof_task", "account worker", ?worker_id); - let task_ctx_clone = task_ctx.clone(); - let work_rx_clone = account_work_rx.clone(); - let storage_work_tx_clone = storage_work_tx.clone(); - let account_available_workers_clone = account_available_workers.clone(); - let cached_storage_roots = cached_storage_roots.clone(); - - executor.spawn_blocking(move || { - #[cfg(feature = "metrics")] - let metrics = ProofTaskTrieMetrics::default(); - #[cfg(feature = "metrics")] - let cursor_metrics = ProofTaskCursorMetrics::new(); - - let _guard = span.enter(); - let worker = AccountProofWorker::new( - task_ctx_clone, - work_rx_clone, - worker_id, - storage_work_tx_clone, - account_available_workers_clone, - cached_storage_roots, - #[cfg(feature = "metrics")] - metrics, - #[cfg(feature = "metrics")] - cursor_metrics, - ) - .with_v2_proofs(v2_proofs_enabled); - if let Err(error) = worker.run() { - error!( - target: "trie::proof_task", - worker_id, - ?error, - "Account worker failed" - ); - } - }); - } - drop(parent_span); - }); - - this + } } /// Returns whether V2 storage proofs are enabled for this worker pool. @@ -2039,7 +2020,7 @@ mod tests { let ctx = test_ctx(factory); let runtime = reth_tasks::Runtime::test(); - let proof_handle = ProofWorkerHandle::new(&runtime, ctx, 5, 3, false); + let proof_handle = ProofWorkerHandle::new(&runtime, ctx, false); // Verify handle can be cloned let _cloned_handle = proof_handle.clone();