chore(observability): add tokio runtime with custom thread naming (#18635)

Co-authored-by: Alexey Shekhirin <5773434+shekhirin@users.noreply.github.com>
This commit is contained in:
YK
2025-09-23 20:05:35 +08:00
committed by GitHub
parent 4c9942b920
commit 088a0d44c2
4 changed files with 36 additions and 5 deletions

View File

@@ -11,7 +11,15 @@
//! Entrypoint for running commands.
use reth_tasks::{TaskExecutor, TaskManager};
use std::{future::Future, pin::pin, sync::mpsc, time::Duration};
use std::{
future::Future,
pin::pin,
sync::{
atomic::{AtomicUsize, Ordering},
mpsc,
},
time::Duration,
};
use tracing::{debug, error, trace};
/// Executes CLI commands.
@@ -159,7 +167,14 @@ pub struct CliContext {
/// Creates a new default tokio multi-thread [Runtime](tokio::runtime::Runtime) with all features
/// enabled
pub fn tokio_runtime() -> Result<tokio::runtime::Runtime, std::io::Error> {
tokio::runtime::Builder::new_multi_thread().enable_all().build()
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name_fn(|| {
static IDX: AtomicUsize = AtomicUsize::new(0);
let id = IDX.fetch_add(1, Ordering::Relaxed);
format!("tokio-{id}")
})
.build()
}
/// Runs the given future to completion or until a critical task panicked.

View File

@@ -2,7 +2,10 @@
use rayon::ThreadPool as RayonPool;
use std::{
sync::{Arc, OnceLock},
sync::{
atomic::{AtomicUsize, Ordering},
Arc, OnceLock,
},
time::Duration,
};
use tokio::{
@@ -71,6 +74,7 @@ impl WorkloadExecutorInner {
Handle::try_current().unwrap_or_else(|_| {
// Create a new runtime if no runtime is available
static RT: OnceLock<Runtime> = OnceLock::new();
static THREAD_COUNTER: AtomicUsize = AtomicUsize::new(0);
let rt = RT.get_or_init(|| {
Builder::new_multi_thread()
@@ -82,6 +86,10 @@ impl WorkloadExecutorInner {
// new block, and instead reuse the existing
// threads.
.thread_keep_alive(Duration::from_secs(15))
.thread_name_fn(|| {
let id = THREAD_COUNTER.fetch_add(1, Ordering::Relaxed);
format!("tokio-payload-{id}")
})
.build()
.unwrap()
});

View File

@@ -72,7 +72,7 @@ impl BlockingTaskPool {
/// Uses [`rayon::ThreadPoolBuilder::build`](rayon::ThreadPoolBuilder::build) defaults but
/// increases the stack size to 8MB.
pub fn build() -> Result<Self, rayon::ThreadPoolBuildError> {
Self::builder().build().map(Self::new)
Self::builder().thread_name(|i| format!("rayon-{i}")).build().map(Self::new)
}
/// Asynchronous wrapper around Rayon's

View File

@@ -20,7 +20,10 @@ use reth_trie::{
use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
use std::{
collections::HashMap,
sync::{mpsc, Arc, OnceLock},
sync::{
atomic::{AtomicUsize, Ordering},
mpsc, Arc, OnceLock,
},
time::Duration,
};
use thiserror::Error;
@@ -283,6 +286,7 @@ fn get_runtime_handle() -> Handle {
Handle::try_current().unwrap_or_else(|_| {
// Create a new runtime if no runtime is available
static RT: OnceLock<Runtime> = OnceLock::new();
static THREAD_COUNTER: AtomicUsize = AtomicUsize::new(0);
let rt = RT.get_or_init(|| {
Builder::new_multi_thread()
@@ -290,6 +294,10 @@ fn get_runtime_handle() -> Handle {
// This prevents the costly process of spawning new threads on every
// new block, and instead reuses the existing threads.
.thread_keep_alive(Duration::from_secs(15))
.thread_name_fn(|| {
let id = THREAD_COUNTER.fetch_add(1, Ordering::Relaxed);
format!("tokio-trie-{id}")
})
.build()
.expect("Failed to create tokio runtime")
});