diff --git a/Cargo.lock b/Cargo.lock index aa802f4ec4..903347a7ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10345,6 +10345,7 @@ name = "reth-tasks" version = "1.11.0" dependencies = [ "crossbeam-utils", + "dashmap", "futures-util", "metrics", "parking_lot", diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index d45d6dd67c..53e5ed08da 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -496,7 +496,7 @@ where { let to_prewarm_task = to_prewarm_task.clone(); - self.executor.spawn_blocking(move || { + self.executor.spawn_blocking_named("prewarm", move || { let mode = if skip_prewarm { PrewarmMode::Skipped } else if let Some(bal) = bal { @@ -550,7 +550,7 @@ where let executor = self.executor.clone(); let parent_span = Span::current(); - self.executor.spawn_blocking(move || { + self.executor.spawn_blocking_named("sparse-trie", move || { let _enter = debug_span!(target: "engine::tree::payload_processor", parent: parent_span, "sparse_trie_task") .entered(); diff --git a/crates/engine/tree/src/tree/payload_processor/prewarm.rs b/crates/engine/tree/src/tree/payload_processor/prewarm.rs index 801df9e811..f0528a60d1 100644 --- a/crates/engine/tree/src/tree/payload_processor/prewarm.rs +++ b/crates/engine/tree/src/tree/payload_processor/prewarm.rs @@ -139,7 +139,7 @@ where let ctx = self.ctx.clone(); let span = Span::current(); - self.executor.spawn_blocking(move || { + self.executor.spawn_blocking_named("prewarm-spawn", move || { let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", parent: span, "spawn_all").entered(); let pool_threads = executor.prewarming_pool().current_num_threads(); diff --git a/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs b/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs index 7a041ffdc9..e926298682 100644 --- a/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs +++ b/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs @@ -118,7 +118,7 @@ where let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded(); let parent_span = tracing::Span::current(); - executor.spawn_blocking(move || { + executor.spawn_blocking_named("trie-hashing", move || { let _span = debug_span!(parent: parent_span, "run_hashing_task").entered(); Self::run_hashing_task(updates, hashed_state_tx) }); diff --git a/crates/engine/tree/src/tree/payload_validator.rs b/crates/engine/tree/src/tree/payload_validator.rs index b159292b53..a18e72712c 100644 --- a/crates/engine/tree/src/tree/payload_validator.rs +++ b/crates/engine/tree/src/tree/payload_validator.rs @@ -339,10 +339,15 @@ where let payload_clone = input.clone(); let validator = self.validator.clone(); let (tx, rx) = tokio::sync::oneshot::channel(); - self.payload_processor.executor().spawn_blocking(move || { - let BlockOrPayload::Payload(payload) = payload_clone else { unreachable!() }; - let _ = tx.send(validator.convert_payload_to_block(payload)); - }); + self.payload_processor.executor().spawn_blocking_named( + "payload-convert", + move || { + let BlockOrPayload::Payload(payload) = payload_clone else { + unreachable!() + }; + let _ = tx.send(validator.convert_payload_to_block(payload)); + }, + ); Either::Left(rx) } BlockOrPayload::Block(_) => Either::Right(()), @@ -804,7 +809,9 @@ where let (receipt_tx, receipt_rx) = crossbeam_channel::unbounded(); let (result_tx, result_rx) = tokio::sync::oneshot::channel(); let task_handle = ReceiptRootTaskHandle::new(receipt_rx, result_tx); - self.payload_processor.executor().spawn_blocking(move || task_handle.run(receipts_len)); + self.payload_processor + .executor() + .spawn_blocking_named("receipt-root", move || task_handle.run(receipts_len)); let transaction_count = input.transaction_count(); let executor = executor.with_state_hook(Some(Box::new(handle.state_hook()))); @@ -1015,7 +1022,7 @@ where let seq_overlay = overlay_factory; let seq_hashed_state = hashed_state.clone(); - self.payload_processor.executor().spawn_blocking(move || { + self.payload_processor.executor().spawn_blocking_named("serial-root", move || { let result = Self::compute_state_root_serial(seq_overlay, &seq_hashed_state); let _ = seq_tx.send(result); }); @@ -1559,7 +1566,9 @@ where }; // Spawn task that computes trie data asynchronously. - self.payload_processor.executor().spawn_blocking(compute_trie_input_task); + self.payload_processor + .executor() + .spawn_blocking_named("trie-input", compute_trie_input_task); ExecutedBlock::with_deferred_trie_data( Arc::new(block), diff --git a/crates/tasks/Cargo.toml b/crates/tasks/Cargo.toml index 90a9f19fa2..17e0fab1d5 100644 --- a/crates/tasks/Cargo.toml +++ b/crates/tasks/Cargo.toml @@ -22,6 +22,7 @@ reth-metrics.workspace = true metrics.workspace = true # misc +dashmap.workspace = true quanta.workspace = true tracing.workspace = true thiserror.workspace = true diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index 0b244be9e2..61639e4e5f 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -33,6 +33,7 @@ use tracing::debug; pub mod metrics; pub mod runtime; pub mod shutdown; +pub(crate) mod worker_map; #[cfg(feature = "rayon")] pub mod pool; diff --git a/crates/tasks/src/runtime.rs b/crates/tasks/src/runtime.rs index 9cd9b11f97..c6b0abaaad 100644 --- a/crates/tasks/src/runtime.rs +++ b/crates/tasks/src/runtime.rs @@ -11,6 +11,7 @@ use crate::pool::{BlockingTaskGuard, BlockingTaskPool, WorkerPool}; use crate::{ metrics::{IncCounterOnDrop, TaskExecutorMetrics}, shutdown::{GracefulShutdown, GracefulShutdownGuard, Shutdown}, + worker_map::WorkerMap, PanickedTaskError, TaskEvent, TaskManager, }; use futures_util::{future::select, Future, FutureExt, TryFutureExt}; @@ -260,6 +261,9 @@ struct RuntimeInner { /// Prewarming pool (execution prewarming workers). #[cfg(feature = "rayon")] prewarming_pool: WorkerPool, + /// Named single-thread worker map. Each unique name gets a dedicated OS thread + /// that is reused across all tasks submitted under that name. + worker_map: WorkerMap, /// Handle to the spawned [`TaskManager`] background task. /// The task monitors critical tasks for panics and fires the shutdown signal. /// Can be taken via [`Runtime::take_task_manager_handle`] to poll for panic errors. @@ -474,6 +478,26 @@ impl Runtime { self.0.handle.spawn_blocking(func) } + /// Spawns a blocking closure on a dedicated, named OS thread. + /// + /// Unlike [`spawn_blocking`](Self::spawn_blocking) which uses tokio's blocking thread pool, + /// this reuses the same OS thread for all tasks submitted under the same `name`. The thread + /// is created lazily on first use and its OS thread name is set to `name`. + /// + /// This is useful for tasks that benefit from running on a stable thread, e.g. for + /// thread-local state reuse or to avoid thread creation overhead on hot paths. + pub fn spawn_blocking_named( + &self, + name: &'static str, + func: F, + ) -> tokio::sync::oneshot::Receiver + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + self.0.worker_map.spawn_on(name, func) + } + /// Spawns the task onto the runtime. /// The given future resolves as soon as the [Shutdown] signal is received. /// @@ -858,6 +882,7 @@ impl RuntimeBuilder { proof_account_worker_pool, #[cfg(feature = "rayon")] prewarming_pool, + worker_map: WorkerMap::new(), task_manager_handle: Mutex::new(Some(task_manager_handle)), _quanta_upkeep: quanta_upkeep, }; diff --git a/crates/tasks/src/worker_map.rs b/crates/tasks/src/worker_map.rs new file mode 100644 index 0000000000..386253d55d --- /dev/null +++ b/crates/tasks/src/worker_map.rs @@ -0,0 +1,166 @@ +//! A map of named single-thread worker pools. +//! +//! Each worker is a dedicated OS thread that processes closures sent to it via a channel. +//! This is a substitute for `spawn_blocking` that reuses the same OS thread for the same +//! named task, like a 1-thread thread pool keyed by name. + +use dashmap::DashMap; +use std::thread; +use tokio::sync::{mpsc, oneshot}; + +type BoxedTask = Box; + +/// A single-thread worker that processes closures sequentially on a dedicated OS thread. +struct WorkerThread { + /// Sender to submit work to this worker's thread. + tx: mpsc::UnboundedSender, + /// The OS thread handle. Taken during shutdown to join. + handle: Option>, +} + +impl WorkerThread { + /// Spawns a new worker thread with the given name. + fn new(name: &'static str) -> Self { + let (tx, mut rx) = mpsc::unbounded_channel::(); + let handle = thread::Builder::new() + .name(name.to_string()) + .spawn(move || { + while let Some(task) = rx.blocking_recv() { + task(); + } + }) + .unwrap_or_else(|e| panic!("failed to spawn worker thread {name:?}: {e}")); + + Self { tx, handle: Some(handle) } + } +} + +/// A map of named single-thread workers. +/// +/// Each unique name gets a dedicated OS thread that is reused for all tasks submitted under +/// that name. Workers are created lazily on first use. +pub(crate) struct WorkerMap { + workers: DashMap<&'static str, WorkerThread>, +} + +impl Default for WorkerMap { + fn default() -> Self { + Self::new() + } +} + +impl WorkerMap { + /// Creates a new empty `WorkerMap`. + pub(crate) fn new() -> Self { + Self { workers: DashMap::new() } + } + + /// Spawns a closure on the dedicated worker thread for the given name. + /// + /// If no worker thread exists for this name yet, one is created with the given name as + /// the OS thread name. The closure executes on the worker's OS thread and the returned + /// future resolves with the result. + pub(crate) fn spawn_on(&self, name: &'static str, f: F) -> oneshot::Receiver + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let (result_tx, result_rx) = oneshot::channel(); + + let task: BoxedTask = Box::new(move || { + let _ = result_tx.send(f()); + }); + + let worker = self.workers.entry(name).or_insert_with(|| WorkerThread::new(name)); + let _ = worker.tx.send(task); + + result_rx + } +} + +impl Drop for WorkerMap { + fn drop(&mut self) { + for (_, mut w) in std::mem::take(&mut self.workers) { + // Drop sender so the thread's recv loop exits, then join. + drop(w.tx); + if let Some(handle) = w.handle.take() { + let _ = handle.join(); + } + } + } +} + +impl std::fmt::Debug for WorkerMap { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("WorkerMap").field("num_workers", &self.workers.len()).finish() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn worker_map_basic() { + let map = WorkerMap::new(); + + let result = map.spawn_on("test", || 42).await.unwrap(); + assert_eq!(result, 42); + } + + #[tokio::test] + async fn worker_map_same_thread() { + let map = WorkerMap::new(); + + let id1 = map.spawn_on("test", || thread::current().id()).await.unwrap(); + let id2 = map.spawn_on("test", || thread::current().id()).await.unwrap(); + assert_eq!(id1, id2, "same name should run on the same thread"); + } + + #[tokio::test] + async fn worker_map_different_names_different_threads() { + let map = WorkerMap::new(); + + let id1 = map.spawn_on("worker-a", || thread::current().id()).await.unwrap(); + let id2 = map.spawn_on("worker-b", || thread::current().id()).await.unwrap(); + assert_ne!(id1, id2, "different names should run on different threads"); + } + + #[tokio::test] + async fn worker_map_sequential_execution() { + use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }; + + let map = WorkerMap::new(); + let counter = Arc::new(AtomicUsize::new(0)); + + let mut receivers = Vec::new(); + for i in 0..10 { + let c = counter.clone(); + let rx = map.spawn_on("sequential", move || { + let val = c.fetch_add(1, Ordering::SeqCst); + assert_eq!(val, i, "tasks should execute in order"); + val + }); + receivers.push(rx); + } + + for (i, rx) in receivers.into_iter().enumerate() { + let val = rx.await.unwrap(); + assert_eq!(val, i); + } + } + + #[tokio::test] + async fn worker_map_thread_name() { + let map = WorkerMap::new(); + + let name = map + .spawn_on("custom-worker", || thread::current().name().unwrap().to_string()) + .await + .unwrap(); + assert_eq!(name, "custom-worker"); + } +}