mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
feat(tasks): add WorkerMap for named single-thread workers (#22262)
Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
committed by
GitHub
parent
47544d9a7e
commit
85d35fa6c0
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -10345,6 +10345,7 @@ name = "reth-tasks"
|
||||
version = "1.11.0"
|
||||
dependencies = [
|
||||
"crossbeam-utils",
|
||||
"dashmap",
|
||||
"futures-util",
|
||||
"metrics",
|
||||
"parking_lot",
|
||||
|
||||
@@ -496,7 +496,7 @@ where
|
||||
|
||||
{
|
||||
let to_prewarm_task = to_prewarm_task.clone();
|
||||
self.executor.spawn_blocking(move || {
|
||||
self.executor.spawn_blocking_named("prewarm", move || {
|
||||
let mode = if skip_prewarm {
|
||||
PrewarmMode::Skipped
|
||||
} else if let Some(bal) = bal {
|
||||
@@ -550,7 +550,7 @@ where
|
||||
let executor = self.executor.clone();
|
||||
|
||||
let parent_span = Span::current();
|
||||
self.executor.spawn_blocking(move || {
|
||||
self.executor.spawn_blocking_named("sparse-trie", move || {
|
||||
let _enter = debug_span!(target: "engine::tree::payload_processor", parent: parent_span, "sparse_trie_task")
|
||||
.entered();
|
||||
|
||||
|
||||
@@ -139,7 +139,7 @@ where
|
||||
let ctx = self.ctx.clone();
|
||||
let span = Span::current();
|
||||
|
||||
self.executor.spawn_blocking(move || {
|
||||
self.executor.spawn_blocking_named("prewarm-spawn", move || {
|
||||
let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", parent: span, "spawn_all").entered();
|
||||
|
||||
let pool_threads = executor.prewarming_pool().current_num_threads();
|
||||
|
||||
@@ -118,7 +118,7 @@ where
|
||||
let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded();
|
||||
|
||||
let parent_span = tracing::Span::current();
|
||||
executor.spawn_blocking(move || {
|
||||
executor.spawn_blocking_named("trie-hashing", move || {
|
||||
let _span = debug_span!(parent: parent_span, "run_hashing_task").entered();
|
||||
Self::run_hashing_task(updates, hashed_state_tx)
|
||||
});
|
||||
|
||||
@@ -339,10 +339,15 @@ where
|
||||
let payload_clone = input.clone();
|
||||
let validator = self.validator.clone();
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
self.payload_processor.executor().spawn_blocking(move || {
|
||||
let BlockOrPayload::Payload(payload) = payload_clone else { unreachable!() };
|
||||
let _ = tx.send(validator.convert_payload_to_block(payload));
|
||||
});
|
||||
self.payload_processor.executor().spawn_blocking_named(
|
||||
"payload-convert",
|
||||
move || {
|
||||
let BlockOrPayload::Payload(payload) = payload_clone else {
|
||||
unreachable!()
|
||||
};
|
||||
let _ = tx.send(validator.convert_payload_to_block(payload));
|
||||
},
|
||||
);
|
||||
Either::Left(rx)
|
||||
}
|
||||
BlockOrPayload::Block(_) => Either::Right(()),
|
||||
@@ -804,7 +809,9 @@ where
|
||||
let (receipt_tx, receipt_rx) = crossbeam_channel::unbounded();
|
||||
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
|
||||
let task_handle = ReceiptRootTaskHandle::new(receipt_rx, result_tx);
|
||||
self.payload_processor.executor().spawn_blocking(move || task_handle.run(receipts_len));
|
||||
self.payload_processor
|
||||
.executor()
|
||||
.spawn_blocking_named("receipt-root", move || task_handle.run(receipts_len));
|
||||
|
||||
let transaction_count = input.transaction_count();
|
||||
let executor = executor.with_state_hook(Some(Box::new(handle.state_hook())));
|
||||
@@ -1015,7 +1022,7 @@ where
|
||||
|
||||
let seq_overlay = overlay_factory;
|
||||
let seq_hashed_state = hashed_state.clone();
|
||||
self.payload_processor.executor().spawn_blocking(move || {
|
||||
self.payload_processor.executor().spawn_blocking_named("serial-root", move || {
|
||||
let result = Self::compute_state_root_serial(seq_overlay, &seq_hashed_state);
|
||||
let _ = seq_tx.send(result);
|
||||
});
|
||||
@@ -1559,7 +1566,9 @@ where
|
||||
};
|
||||
|
||||
// Spawn task that computes trie data asynchronously.
|
||||
self.payload_processor.executor().spawn_blocking(compute_trie_input_task);
|
||||
self.payload_processor
|
||||
.executor()
|
||||
.spawn_blocking_named("trie-input", compute_trie_input_task);
|
||||
|
||||
ExecutedBlock::with_deferred_trie_data(
|
||||
Arc::new(block),
|
||||
|
||||
@@ -22,6 +22,7 @@ reth-metrics.workspace = true
|
||||
metrics.workspace = true
|
||||
|
||||
# misc
|
||||
dashmap.workspace = true
|
||||
quanta.workspace = true
|
||||
tracing.workspace = true
|
||||
thiserror.workspace = true
|
||||
|
||||
@@ -33,6 +33,7 @@ use tracing::debug;
|
||||
pub mod metrics;
|
||||
pub mod runtime;
|
||||
pub mod shutdown;
|
||||
pub(crate) mod worker_map;
|
||||
|
||||
#[cfg(feature = "rayon")]
|
||||
pub mod pool;
|
||||
|
||||
@@ -11,6 +11,7 @@ use crate::pool::{BlockingTaskGuard, BlockingTaskPool, WorkerPool};
|
||||
use crate::{
|
||||
metrics::{IncCounterOnDrop, TaskExecutorMetrics},
|
||||
shutdown::{GracefulShutdown, GracefulShutdownGuard, Shutdown},
|
||||
worker_map::WorkerMap,
|
||||
PanickedTaskError, TaskEvent, TaskManager,
|
||||
};
|
||||
use futures_util::{future::select, Future, FutureExt, TryFutureExt};
|
||||
@@ -260,6 +261,9 @@ struct RuntimeInner {
|
||||
/// Prewarming pool (execution prewarming workers).
|
||||
#[cfg(feature = "rayon")]
|
||||
prewarming_pool: WorkerPool,
|
||||
/// Named single-thread worker map. Each unique name gets a dedicated OS thread
|
||||
/// that is reused across all tasks submitted under that name.
|
||||
worker_map: WorkerMap,
|
||||
/// Handle to the spawned [`TaskManager`] background task.
|
||||
/// The task monitors critical tasks for panics and fires the shutdown signal.
|
||||
/// Can be taken via [`Runtime::take_task_manager_handle`] to poll for panic errors.
|
||||
@@ -474,6 +478,26 @@ impl Runtime {
|
||||
self.0.handle.spawn_blocking(func)
|
||||
}
|
||||
|
||||
/// Spawns a blocking closure on a dedicated, named OS thread.
|
||||
///
|
||||
/// Unlike [`spawn_blocking`](Self::spawn_blocking) which uses tokio's blocking thread pool,
|
||||
/// this reuses the same OS thread for all tasks submitted under the same `name`. The thread
|
||||
/// is created lazily on first use and its OS thread name is set to `name`.
|
||||
///
|
||||
/// This is useful for tasks that benefit from running on a stable thread, e.g. for
|
||||
/// thread-local state reuse or to avoid thread creation overhead on hot paths.
|
||||
pub fn spawn_blocking_named<F, R>(
|
||||
&self,
|
||||
name: &'static str,
|
||||
func: F,
|
||||
) -> tokio::sync::oneshot::Receiver<R>
|
||||
where
|
||||
F: FnOnce() -> R + Send + 'static,
|
||||
R: Send + 'static,
|
||||
{
|
||||
self.0.worker_map.spawn_on(name, func)
|
||||
}
|
||||
|
||||
/// Spawns the task onto the runtime.
|
||||
/// The given future resolves as soon as the [Shutdown] signal is received.
|
||||
///
|
||||
@@ -858,6 +882,7 @@ impl RuntimeBuilder {
|
||||
proof_account_worker_pool,
|
||||
#[cfg(feature = "rayon")]
|
||||
prewarming_pool,
|
||||
worker_map: WorkerMap::new(),
|
||||
task_manager_handle: Mutex::new(Some(task_manager_handle)),
|
||||
_quanta_upkeep: quanta_upkeep,
|
||||
};
|
||||
|
||||
166
crates/tasks/src/worker_map.rs
Normal file
166
crates/tasks/src/worker_map.rs
Normal file
@@ -0,0 +1,166 @@
|
||||
//! A map of named single-thread worker pools.
|
||||
//!
|
||||
//! Each worker is a dedicated OS thread that processes closures sent to it via a channel.
|
||||
//! This is a substitute for `spawn_blocking` that reuses the same OS thread for the same
|
||||
//! named task, like a 1-thread thread pool keyed by name.
|
||||
|
||||
use dashmap::DashMap;
|
||||
use std::thread;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
|
||||
type BoxedTask = Box<dyn FnOnce() + Send + 'static>;
|
||||
|
||||
/// A single-thread worker that processes closures sequentially on a dedicated OS thread.
|
||||
struct WorkerThread {
|
||||
/// Sender to submit work to this worker's thread.
|
||||
tx: mpsc::UnboundedSender<BoxedTask>,
|
||||
/// The OS thread handle. Taken during shutdown to join.
|
||||
handle: Option<thread::JoinHandle<()>>,
|
||||
}
|
||||
|
||||
impl WorkerThread {
|
||||
/// Spawns a new worker thread with the given name.
|
||||
fn new(name: &'static str) -> Self {
|
||||
let (tx, mut rx) = mpsc::unbounded_channel::<BoxedTask>();
|
||||
let handle = thread::Builder::new()
|
||||
.name(name.to_string())
|
||||
.spawn(move || {
|
||||
while let Some(task) = rx.blocking_recv() {
|
||||
task();
|
||||
}
|
||||
})
|
||||
.unwrap_or_else(|e| panic!("failed to spawn worker thread {name:?}: {e}"));
|
||||
|
||||
Self { tx, handle: Some(handle) }
|
||||
}
|
||||
}
|
||||
|
||||
/// A map of named single-thread workers.
|
||||
///
|
||||
/// Each unique name gets a dedicated OS thread that is reused for all tasks submitted under
|
||||
/// that name. Workers are created lazily on first use.
|
||||
pub(crate) struct WorkerMap {
|
||||
workers: DashMap<&'static str, WorkerThread>,
|
||||
}
|
||||
|
||||
impl Default for WorkerMap {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl WorkerMap {
|
||||
/// Creates a new empty `WorkerMap`.
|
||||
pub(crate) fn new() -> Self {
|
||||
Self { workers: DashMap::new() }
|
||||
}
|
||||
|
||||
/// Spawns a closure on the dedicated worker thread for the given name.
|
||||
///
|
||||
/// If no worker thread exists for this name yet, one is created with the given name as
|
||||
/// the OS thread name. The closure executes on the worker's OS thread and the returned
|
||||
/// future resolves with the result.
|
||||
pub(crate) fn spawn_on<F, R>(&self, name: &'static str, f: F) -> oneshot::Receiver<R>
|
||||
where
|
||||
F: FnOnce() -> R + Send + 'static,
|
||||
R: Send + 'static,
|
||||
{
|
||||
let (result_tx, result_rx) = oneshot::channel();
|
||||
|
||||
let task: BoxedTask = Box::new(move || {
|
||||
let _ = result_tx.send(f());
|
||||
});
|
||||
|
||||
let worker = self.workers.entry(name).or_insert_with(|| WorkerThread::new(name));
|
||||
let _ = worker.tx.send(task);
|
||||
|
||||
result_rx
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for WorkerMap {
|
||||
fn drop(&mut self) {
|
||||
for (_, mut w) in std::mem::take(&mut self.workers) {
|
||||
// Drop sender so the thread's recv loop exits, then join.
|
||||
drop(w.tx);
|
||||
if let Some(handle) = w.handle.take() {
|
||||
let _ = handle.join();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for WorkerMap {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("WorkerMap").field("num_workers", &self.workers.len()).finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn worker_map_basic() {
|
||||
let map = WorkerMap::new();
|
||||
|
||||
let result = map.spawn_on("test", || 42).await.unwrap();
|
||||
assert_eq!(result, 42);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn worker_map_same_thread() {
|
||||
let map = WorkerMap::new();
|
||||
|
||||
let id1 = map.spawn_on("test", || thread::current().id()).await.unwrap();
|
||||
let id2 = map.spawn_on("test", || thread::current().id()).await.unwrap();
|
||||
assert_eq!(id1, id2, "same name should run on the same thread");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn worker_map_different_names_different_threads() {
|
||||
let map = WorkerMap::new();
|
||||
|
||||
let id1 = map.spawn_on("worker-a", || thread::current().id()).await.unwrap();
|
||||
let id2 = map.spawn_on("worker-b", || thread::current().id()).await.unwrap();
|
||||
assert_ne!(id1, id2, "different names should run on different threads");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn worker_map_sequential_execution() {
|
||||
use std::sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
Arc,
|
||||
};
|
||||
|
||||
let map = WorkerMap::new();
|
||||
let counter = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
let mut receivers = Vec::new();
|
||||
for i in 0..10 {
|
||||
let c = counter.clone();
|
||||
let rx = map.spawn_on("sequential", move || {
|
||||
let val = c.fetch_add(1, Ordering::SeqCst);
|
||||
assert_eq!(val, i, "tasks should execute in order");
|
||||
val
|
||||
});
|
||||
receivers.push(rx);
|
||||
}
|
||||
|
||||
for (i, rx) in receivers.into_iter().enumerate() {
|
||||
let val = rx.await.unwrap();
|
||||
assert_eq!(val, i);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn worker_map_thread_name() {
|
||||
let map = WorkerMap::new();
|
||||
|
||||
let name = map
|
||||
.spawn_on("custom-worker", || thread::current().name().unwrap().to_string())
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(name, "custom-worker");
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user