feat(engine): add descriptive thread names for spawn_blocking tasks

Closes #20430 by using prctl/pthread_setname_np to set thread names at runtime
inside spawn_blocking tasks, keeping Tokio's thread pooling while making tasks
identifiable in profilers like Samply.

Thread names (kept under 15-char Linux limit):
- reth-multiproof: Multi-proof computation task
- reth-sparse: Sparse trie task
- reth-prewarm: Prewarm task coordinator
- reth-prewarm-w: Prewarm worker threads
- reth-tx-conv: Transaction conversion task
- reth-tx-order: Transaction ordering task
- reth-stor-proof: Storage proof workers
- reth-acct-proof: Account proof workers

Changes:
- Add spawn_blocking_named() to WorkloadExecutor that uses set_thread_name()
- Add set_thread_name() helper using prctl (Linux) / pthread_setname_np (macOS)
- Update all engine blocking tasks to use named threads
- Update ProofWorkerHandle to set thread names inside spawn_blocking
This commit is contained in:
yongkangc
2025-12-18 08:49:41 +00:00
parent a8d0a89a62
commit 54d07c55e5
10 changed files with 101 additions and 47 deletions

2
Cargo.lock generated
View File

@@ -8244,6 +8244,7 @@ dependencies = [
"derive_more",
"eyre",
"futures",
"libc",
"metrics",
"metrics-util",
"mini-moka",
@@ -10995,6 +10996,7 @@ dependencies = [
"dashmap 6.1.0",
"derive_more",
"itertools 0.14.0",
"libc",
"metrics",
"proptest",
"proptest-arbitrary-interop",

View File

@@ -214,16 +214,7 @@ pub struct CliContext {
/// Creates a new default tokio multi-thread [Runtime](tokio::runtime::Runtime) with all features
/// enabled
pub fn tokio_runtime() -> Result<tokio::runtime::Runtime, std::io::Error> {
use std::sync::atomic::{AtomicUsize, Ordering};
static THREAD_IDX: AtomicUsize = AtomicUsize::new(0);
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name_fn(|| {
let idx = THREAD_IDX.fetch_add(1, Ordering::Relaxed);
format!("reth-node-{idx}")
})
.build()
tokio::runtime::Builder::new_multi_thread().enable_all().build()
}
/// Runs the given future to completion or until a critical task panicked.

View File

@@ -67,6 +67,9 @@ derive_more.workspace = true
parking_lot.workspace = true
crossbeam-channel.workspace = true
[target.'cfg(unix)'.dependencies]
libc = "0.2"
# optional deps for test-utils
reth-prune-types = { workspace = true, optional = true }
reth-stages = { workspace = true, optional = true }

View File

@@ -1,17 +1,39 @@
//! Executor for mixed I/O and CPU workloads.
use std::{
sync::{
atomic::{AtomicUsize, Ordering},
OnceLock,
},
time::Duration,
};
use std::{sync::OnceLock, time::Duration};
use tokio::{
runtime::{Builder, Handle, Runtime},
task::JoinHandle,
};
/// Sets the current thread's name for profiling visibility.
#[inline]
fn set_thread_name(name: &str) {
#[cfg(target_os = "linux")]
{
// SAFETY: name is a valid string, prctl with PR_SET_NAME is safe
unsafe {
// PR_SET_NAME expects a null-terminated string, truncated to 16 bytes (including null)
let mut buf = [0u8; 16];
let len = name.len().min(15);
buf[..len].copy_from_slice(&name.as_bytes()[..len]);
libc::prctl(libc::PR_SET_NAME, buf.as_ptr());
}
}
#[cfg(target_os = "macos")]
{
// SAFETY: name is a valid string
unsafe {
let c_name = std::ffi::CString::new(name).unwrap_or_default();
libc::pthread_setname_np(c_name.as_ptr());
}
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
{
let _ = name;
}
}
/// An executor for mixed I/O and CPU workloads.
///
/// This type uses tokio to spawn blocking tasks and will reuse an existing tokio
@@ -42,6 +64,22 @@ impl WorkloadExecutor {
{
self.inner.handle.spawn_blocking(func)
}
/// Spawns a blocking task with a descriptive thread name for profiling.
///
/// Sets the thread name at runtime, making it identifiable in profiling tools like Samply.
/// Uses Tokio's blocking thread pool for efficiency.
#[track_caller]
pub fn spawn_blocking_named<F, R>(&self, name: &'static str, func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
self.inner.handle.spawn_blocking(move || {
set_thread_name(name);
func()
})
}
}
#[derive(Debug, Clone)]
@@ -57,8 +95,6 @@ impl WorkloadExecutorInner {
static RT: OnceLock<Runtime> = OnceLock::new();
let rt = RT.get_or_init(|| {
static THREAD_IDX: AtomicUsize = AtomicUsize::new(0);
Builder::new_multi_thread()
.enable_all()
// Keep the threads alive for at least the block time, which is 12 seconds
@@ -68,10 +104,6 @@ impl WorkloadExecutorInner {
// new block, and instead reuse the existing
// threads.
.thread_keep_alive(Duration::from_secs(15))
.thread_name_fn(|| {
let idx = THREAD_IDX.fetch_add(1, Ordering::Relaxed);
format!("reth-eng-{idx}")
})
.build()
.unwrap()
});

View File

@@ -296,7 +296,7 @@ where
// spawn multi-proof task
let parent_span = span.clone();
self.executor.spawn_blocking(move || {
self.executor.spawn_blocking_named("reth-multiproof", move || {
let _enter = parent_span.entered();
// Build a state provider for the multiproof task
let provider = provider_builder.build().expect("failed to build provider");
@@ -362,7 +362,7 @@ where
let (execute_tx, execute_rx) = mpsc::channel();
// Spawn a task that `convert`s all transactions in parallel and sends them out-of-order.
self.executor.spawn_blocking(move || {
self.executor.spawn_blocking_named("reth-tx-conv", move || {
transactions.enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| {
let tx = convert(tx);
let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx: Arc::new(tx) });
@@ -376,7 +376,7 @@ where
// Spawn a task that processes out-of-order transactions from the task above and sends them
// to the execution task in order.
self.executor.spawn_blocking(move || {
self.executor.spawn_blocking_named("reth-tx-order", move || {
let mut next_for_execution = 0;
let mut queue = BTreeMap::new();
while let Ok((idx, tx)) = ooo_rx.recv() {
@@ -450,7 +450,7 @@ where
// spawn pre-warm task
{
let to_prewarm_task = to_prewarm_task.clone();
self.executor.spawn_blocking(move || {
self.executor.spawn_blocking_named("reth-prewarm", move || {
prewarm_task.run(transactions, to_prewarm_task);
});
}
@@ -515,7 +515,7 @@ where
);
let span = Span::current();
self.executor.spawn_blocking(move || {
self.executor.spawn_blocking_named("reth-sparse", move || {
let _enter = span.entered();
let (result, trie) = task.run();

View File

@@ -149,7 +149,7 @@ where
let transaction_count_hint = self.transaction_count_hint;
let span = Span::current();
self.executor.spawn_blocking(move || {
self.executor.spawn_blocking_named("reth-prewarm", move || {
let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", parent: span, "spawn_all").entered();
let (done_tx, done_rx) = mpsc::channel();
@@ -551,7 +551,7 @@ where
let span =
debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm worker", idx);
executor.spawn_blocking(move || {
executor.spawn_blocking_named("reth-prewarm-w", move || {
let _enter = span.entered();
ctx.transact_batch(rx, actions_tx, done_tx);
});

View File

@@ -73,7 +73,7 @@ impl BlockingTaskPool {
/// If a different stack size or other parameters are needed, they can be configured via
/// [`rayon::ThreadPoolBuilder`] returned by [`Self::builder`].
pub fn build() -> Result<Self, rayon::ThreadPoolBuildError> {
Self::builder().thread_name(|i| format!("reth-blk-{i}")).build().map(Self::new)
Self::builder().build().map(Self::new)
}
/// Asynchronous wrapper around Rayon's

View File

@@ -36,6 +36,9 @@ itertools.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread"] }
crossbeam-channel.workspace = true
[target.'cfg(unix)'.dependencies]
libc = "0.2"
# `metrics` feature
reth-metrics = { workspace = true, optional = true }
metrics = { workspace = true, optional = true }

View File

@@ -262,7 +262,6 @@ mod tests {
use reth_provider::{test_utils::create_test_provider_factory, HashingWriter};
use reth_trie::proof::Proof;
use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
use tokio::runtime::Runtime;
#[test]
fn random_parallel_proof() {
@@ -326,7 +325,7 @@ mod tests {
let trie_cursor_factory = DatabaseTrieCursorFactory::new(provider_rw.tx_ref());
let hashed_cursor_factory = DatabaseHashedCursorFactory::new(provider_rw.tx_ref());
let rt = Runtime::new().unwrap();
let rt = tokio::runtime::Runtime::new().unwrap();
let factory = reth_provider::providers::OverlayStateProviderFactory::new(factory);
let task_ctx = ProofTaskCtx::new(factory);

View File

@@ -71,6 +71,33 @@ use std::{
use tokio::runtime::Handle;
use tracing::{debug, debug_span, error, trace};
/// Sets the current thread's name for profiling visibility.
#[inline]
fn set_thread_name(name: &str) {
#[cfg(target_os = "linux")]
{
// SAFETY: prctl with PR_SET_NAME is safe with a valid string pointer
unsafe {
let mut buf = [0u8; 16];
let len = name.len().min(15);
buf[..len].copy_from_slice(&name.as_bytes()[..len]);
libc::prctl(libc::PR_SET_NAME, buf.as_ptr());
}
}
#[cfg(target_os = "macos")]
{
// SAFETY: pthread_setname_np is safe with a valid CString
unsafe {
let c_name = std::ffi::CString::new(name).unwrap_or_default();
libc::pthread_setname_np(c_name.as_ptr());
}
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
{
let _ = name;
}
}
#[cfg(feature = "metrics")]
use crate::proof_task_metrics::{
ProofTaskCursorMetrics, ProofTaskCursorMetricsCache, ProofTaskTrieMetrics,
@@ -151,6 +178,7 @@ impl ProofWorkerHandle {
let storage_available_workers_clone = storage_available_workers.clone();
executor.spawn_blocking(move || {
set_thread_name("reth-stor-proof");
#[cfg(feature = "metrics")]
let metrics = ProofTaskTrieMetrics::default();
#[cfg(feature = "metrics")]
@@ -191,6 +219,7 @@ impl ProofWorkerHandle {
let account_available_workers_clone = account_available_workers.clone();
executor.spawn_blocking(move || {
set_thread_name("reth-acct-proof");
#[cfg(feature = "metrics")]
let metrics = ProofTaskTrieMetrics::default();
#[cfg(feature = "metrics")]
@@ -1588,7 +1617,7 @@ enum AccountWorkerJob {
mod tests {
use super::*;
use reth_provider::test_utils::create_test_provider_factory;
use tokio::{runtime::Builder, task};
use tokio::runtime::Builder;
fn test_ctx<Factory>(factory: Factory) -> ProofTaskCtx<Factory> {
ProofTaskCtx::new(factory)
@@ -1598,21 +1627,16 @@ mod tests {
#[test]
fn spawn_proof_workers_creates_handle() {
let runtime = Builder::new_multi_thread().worker_threads(1).enable_all().build().unwrap();
runtime.block_on(async {
let handle = tokio::runtime::Handle::current();
let provider_factory = create_test_provider_factory();
let factory =
reth_provider::providers::OverlayStateProviderFactory::new(provider_factory);
let ctx = test_ctx(factory);
let provider_factory = create_test_provider_factory();
let factory = reth_provider::providers::OverlayStateProviderFactory::new(provider_factory);
let ctx = test_ctx(factory);
let proof_handle = ProofWorkerHandle::new(handle.clone(), ctx, 5, 3);
let proof_handle = ProofWorkerHandle::new(runtime.handle().clone(), ctx, 5, 3);
// Verify handle can be cloned
let _cloned_handle = proof_handle.clone();
// Verify handle can be cloned
let _cloned_handle = proof_handle.clone();
// Workers shut down automatically when handle is dropped
drop(proof_handle);
task::yield_now().await;
});
// Workers shut down automatically when handle is dropped
drop(proof_handle);
}
}