diff --git a/crates/cli/runner/src/lib.rs b/crates/cli/runner/src/lib.rs index 3ea3446a03..fdb04b1e2d 100644 --- a/crates/cli/runner/src/lib.rs +++ b/crates/cli/runner/src/lib.rs @@ -219,7 +219,14 @@ impl CliRunnerConfig { /// Creates a new default tokio multi-thread [Runtime](tokio::runtime::Runtime) with all features /// enabled pub fn tokio_runtime() -> Result { - tokio::runtime::Builder::new_multi_thread().enable_all().build() + tokio::runtime::Builder::new_multi_thread() + .enable_all() + // Keep the threads alive for at least the block time (12 seconds) plus buffer. + // This prevents the costly process of spawning new threads on every + // new block, and instead reuses the existing threads. + .thread_keep_alive(Duration::from_secs(15)) + .thread_name("tokio-rt") + .build() } /// Runs the given future to completion or until a critical task panicked. diff --git a/crates/engine/tree/src/tree/payload_processor/executor.rs b/crates/engine/tree/src/tree/payload_processor/executor.rs index 8409a4a06b..410c344a59 100644 --- a/crates/engine/tree/src/tree/payload_processor/executor.rs +++ b/crates/engine/tree/src/tree/payload_processor/executor.rs @@ -1,10 +1,7 @@ //! Executor for mixed I/O and CPU workloads. -use std::{sync::OnceLock, time::Duration}; -use tokio::{ - runtime::{Builder, Handle, Runtime}, - task::JoinHandle, -}; +use reth_trie_parallel::root::get_tokio_runtime_handle; +use tokio::{runtime::Handle, task::JoinHandle}; /// An executor for mixed I/O and CPU workloads. /// @@ -27,7 +24,7 @@ impl WorkloadExecutor { &self.inner.handle } - /// Shorthand for [`Runtime::spawn_blocking`] + /// Runs the provided function on an executor dedicated to blocking operations. #[track_caller] pub fn spawn_blocking(&self, func: F) -> JoinHandle where @@ -45,29 +42,6 @@ struct WorkloadExecutorInner { impl WorkloadExecutorInner { fn new() -> Self { - fn get_runtime_handle() -> Handle { - Handle::try_current().unwrap_or_else(|_| { - // Create a new runtime if no runtime is available - static RT: OnceLock = OnceLock::new(); - - let rt = RT.get_or_init(|| { - Builder::new_multi_thread() - .enable_all() - // Keep the threads alive for at least the block time, which is 12 seconds - // at the time of writing, plus a little extra. - // - // This is to prevent the costly process of spawning new threads on every - // new block, and instead reuse the existing - // threads. - .thread_keep_alive(Duration::from_secs(15)) - .build() - .unwrap() - }); - - rt.handle().clone() - }) - } - - Self { handle: get_runtime_handle() } + Self { handle: get_tokio_runtime_handle() } } } diff --git a/crates/trie/parallel/src/root.rs b/crates/trie/parallel/src/root.rs index cffdf083bf..1736fa3ff0 100644 --- a/crates/trie/parallel/src/root.rs +++ b/crates/trie/parallel/src/root.rs @@ -98,7 +98,7 @@ where let mut storage_roots = HashMap::with_capacity(storage_root_targets.len()); // Get runtime handle once outside the loop - let handle = get_runtime_handle(); + let handle = get_tokio_runtime_handle(); for (hashed_address, prefix_set) in storage_root_targets.into_iter().sorted_unstable_by_key(|(address, _)| *address) @@ -273,17 +273,19 @@ impl From for ParallelStateRootError { /// Gets or creates a tokio runtime handle for spawning blocking tasks. /// This ensures we always have a runtime available for I/O operations. -fn get_runtime_handle() -> Handle { +pub fn get_tokio_runtime_handle() -> Handle { Handle::try_current().unwrap_or_else(|_| { // Create a new runtime if no runtime is available static RT: OnceLock = OnceLock::new(); let rt = RT.get_or_init(|| { Builder::new_multi_thread() + .enable_all() // Keep the threads alive for at least the block time (12 seconds) plus buffer. // This prevents the costly process of spawning new threads on every // new block, and instead reuses the existing threads. .thread_keep_alive(Duration::from_secs(15)) + .thread_name("trie-tokio-rt") .build() .expect("Failed to create tokio runtime") });