mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
2 Commits
t4
...
yk/thread_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
54d07c55e5 | ||
|
|
a8d0a89a62 |
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -8244,6 +8244,7 @@ dependencies = [
|
||||
"derive_more",
|
||||
"eyre",
|
||||
"futures",
|
||||
"libc",
|
||||
"metrics",
|
||||
"metrics-util",
|
||||
"mini-moka",
|
||||
@@ -10995,6 +10996,7 @@ dependencies = [
|
||||
"dashmap 6.1.0",
|
||||
"derive_more",
|
||||
"itertools 0.14.0",
|
||||
"libc",
|
||||
"metrics",
|
||||
"proptest",
|
||||
"proptest-arbitrary-interop",
|
||||
|
||||
@@ -67,6 +67,9 @@ derive_more.workspace = true
|
||||
parking_lot.workspace = true
|
||||
crossbeam-channel.workspace = true
|
||||
|
||||
[target.'cfg(unix)'.dependencies]
|
||||
libc = "0.2"
|
||||
|
||||
# optional deps for test-utils
|
||||
reth-prune-types = { workspace = true, optional = true }
|
||||
reth-stages = { workspace = true, optional = true }
|
||||
|
||||
@@ -6,6 +6,34 @@ use tokio::{
|
||||
task::JoinHandle,
|
||||
};
|
||||
|
||||
/// Sets the current thread's name for profiling visibility.
|
||||
#[inline]
|
||||
fn set_thread_name(name: &str) {
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
// SAFETY: name is a valid string, prctl with PR_SET_NAME is safe
|
||||
unsafe {
|
||||
// PR_SET_NAME expects a null-terminated string, truncated to 16 bytes (including null)
|
||||
let mut buf = [0u8; 16];
|
||||
let len = name.len().min(15);
|
||||
buf[..len].copy_from_slice(&name.as_bytes()[..len]);
|
||||
libc::prctl(libc::PR_SET_NAME, buf.as_ptr());
|
||||
}
|
||||
}
|
||||
#[cfg(target_os = "macos")]
|
||||
{
|
||||
// SAFETY: name is a valid string
|
||||
unsafe {
|
||||
let c_name = std::ffi::CString::new(name).unwrap_or_default();
|
||||
libc::pthread_setname_np(c_name.as_ptr());
|
||||
}
|
||||
}
|
||||
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
|
||||
{
|
||||
let _ = name;
|
||||
}
|
||||
}
|
||||
|
||||
/// An executor for mixed I/O and CPU workloads.
|
||||
///
|
||||
/// This type uses tokio to spawn blocking tasks and will reuse an existing tokio
|
||||
@@ -36,6 +64,22 @@ impl WorkloadExecutor {
|
||||
{
|
||||
self.inner.handle.spawn_blocking(func)
|
||||
}
|
||||
|
||||
/// Spawns a blocking task with a descriptive thread name for profiling.
|
||||
///
|
||||
/// Sets the thread name at runtime, making it identifiable in profiling tools like Samply.
|
||||
/// Uses Tokio's blocking thread pool for efficiency.
|
||||
#[track_caller]
|
||||
pub fn spawn_blocking_named<F, R>(&self, name: &'static str, func: F) -> JoinHandle<R>
|
||||
where
|
||||
F: FnOnce() -> R + Send + 'static,
|
||||
R: Send + 'static,
|
||||
{
|
||||
self.inner.handle.spawn_blocking(move || {
|
||||
set_thread_name(name);
|
||||
func()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
|
||||
@@ -296,7 +296,7 @@ where
|
||||
|
||||
// spawn multi-proof task
|
||||
let parent_span = span.clone();
|
||||
self.executor.spawn_blocking(move || {
|
||||
self.executor.spawn_blocking_named("reth-multiproof", move || {
|
||||
let _enter = parent_span.entered();
|
||||
// Build a state provider for the multiproof task
|
||||
let provider = provider_builder.build().expect("failed to build provider");
|
||||
@@ -362,7 +362,7 @@ where
|
||||
let (execute_tx, execute_rx) = mpsc::channel();
|
||||
|
||||
// Spawn a task that `convert`s all transactions in parallel and sends them out-of-order.
|
||||
self.executor.spawn_blocking(move || {
|
||||
self.executor.spawn_blocking_named("reth-tx-conv", move || {
|
||||
transactions.enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| {
|
||||
let tx = convert(tx);
|
||||
let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx: Arc::new(tx) });
|
||||
@@ -376,7 +376,7 @@ where
|
||||
|
||||
// Spawn a task that processes out-of-order transactions from the task above and sends them
|
||||
// to the execution task in order.
|
||||
self.executor.spawn_blocking(move || {
|
||||
self.executor.spawn_blocking_named("reth-tx-order", move || {
|
||||
let mut next_for_execution = 0;
|
||||
let mut queue = BTreeMap::new();
|
||||
while let Ok((idx, tx)) = ooo_rx.recv() {
|
||||
@@ -450,7 +450,7 @@ where
|
||||
// spawn pre-warm task
|
||||
{
|
||||
let to_prewarm_task = to_prewarm_task.clone();
|
||||
self.executor.spawn_blocking(move || {
|
||||
self.executor.spawn_blocking_named("reth-prewarm", move || {
|
||||
prewarm_task.run(transactions, to_prewarm_task);
|
||||
});
|
||||
}
|
||||
@@ -515,7 +515,7 @@ where
|
||||
);
|
||||
|
||||
let span = Span::current();
|
||||
self.executor.spawn_blocking(move || {
|
||||
self.executor.spawn_blocking_named("reth-sparse", move || {
|
||||
let _enter = span.entered();
|
||||
|
||||
let (result, trie) = task.run();
|
||||
|
||||
@@ -149,7 +149,7 @@ where
|
||||
let transaction_count_hint = self.transaction_count_hint;
|
||||
let span = Span::current();
|
||||
|
||||
self.executor.spawn_blocking(move || {
|
||||
self.executor.spawn_blocking_named("reth-prewarm", move || {
|
||||
let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", parent: span, "spawn_all").entered();
|
||||
|
||||
let (done_tx, done_rx) = mpsc::channel();
|
||||
@@ -551,7 +551,7 @@ where
|
||||
let span =
|
||||
debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm worker", idx);
|
||||
|
||||
executor.spawn_blocking(move || {
|
||||
executor.spawn_blocking_named("reth-prewarm-w", move || {
|
||||
let _enter = span.entered();
|
||||
ctx.transact_batch(rx, actions_tx, done_tx);
|
||||
});
|
||||
|
||||
@@ -36,6 +36,9 @@ itertools.workspace = true
|
||||
tokio = { workspace = true, features = ["rt-multi-thread"] }
|
||||
crossbeam-channel.workspace = true
|
||||
|
||||
[target.'cfg(unix)'.dependencies]
|
||||
libc = "0.2"
|
||||
|
||||
# `metrics` feature
|
||||
reth-metrics = { workspace = true, optional = true }
|
||||
metrics = { workspace = true, optional = true }
|
||||
|
||||
@@ -262,7 +262,6 @@ mod tests {
|
||||
use reth_provider::{test_utils::create_test_provider_factory, HashingWriter};
|
||||
use reth_trie::proof::Proof;
|
||||
use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
|
||||
use tokio::runtime::Runtime;
|
||||
|
||||
#[test]
|
||||
fn random_parallel_proof() {
|
||||
@@ -326,7 +325,7 @@ mod tests {
|
||||
let trie_cursor_factory = DatabaseTrieCursorFactory::new(provider_rw.tx_ref());
|
||||
let hashed_cursor_factory = DatabaseHashedCursorFactory::new(provider_rw.tx_ref());
|
||||
|
||||
let rt = Runtime::new().unwrap();
|
||||
let rt = tokio::runtime::Runtime::new().unwrap();
|
||||
|
||||
let factory = reth_provider::providers::OverlayStateProviderFactory::new(factory);
|
||||
let task_ctx = ProofTaskCtx::new(factory);
|
||||
|
||||
@@ -71,6 +71,33 @@ use std::{
|
||||
use tokio::runtime::Handle;
|
||||
use tracing::{debug, debug_span, error, trace};
|
||||
|
||||
/// Sets the current thread's name for profiling visibility.
|
||||
#[inline]
|
||||
fn set_thread_name(name: &str) {
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
// SAFETY: prctl with PR_SET_NAME is safe with a valid string pointer
|
||||
unsafe {
|
||||
let mut buf = [0u8; 16];
|
||||
let len = name.len().min(15);
|
||||
buf[..len].copy_from_slice(&name.as_bytes()[..len]);
|
||||
libc::prctl(libc::PR_SET_NAME, buf.as_ptr());
|
||||
}
|
||||
}
|
||||
#[cfg(target_os = "macos")]
|
||||
{
|
||||
// SAFETY: pthread_setname_np is safe with a valid CString
|
||||
unsafe {
|
||||
let c_name = std::ffi::CString::new(name).unwrap_or_default();
|
||||
libc::pthread_setname_np(c_name.as_ptr());
|
||||
}
|
||||
}
|
||||
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
|
||||
{
|
||||
let _ = name;
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "metrics")]
|
||||
use crate::proof_task_metrics::{
|
||||
ProofTaskCursorMetrics, ProofTaskCursorMetricsCache, ProofTaskTrieMetrics,
|
||||
@@ -151,6 +178,7 @@ impl ProofWorkerHandle {
|
||||
let storage_available_workers_clone = storage_available_workers.clone();
|
||||
|
||||
executor.spawn_blocking(move || {
|
||||
set_thread_name("reth-stor-proof");
|
||||
#[cfg(feature = "metrics")]
|
||||
let metrics = ProofTaskTrieMetrics::default();
|
||||
#[cfg(feature = "metrics")]
|
||||
@@ -191,6 +219,7 @@ impl ProofWorkerHandle {
|
||||
let account_available_workers_clone = account_available_workers.clone();
|
||||
|
||||
executor.spawn_blocking(move || {
|
||||
set_thread_name("reth-acct-proof");
|
||||
#[cfg(feature = "metrics")]
|
||||
let metrics = ProofTaskTrieMetrics::default();
|
||||
#[cfg(feature = "metrics")]
|
||||
@@ -1588,7 +1617,7 @@ enum AccountWorkerJob {
|
||||
mod tests {
|
||||
use super::*;
|
||||
use reth_provider::test_utils::create_test_provider_factory;
|
||||
use tokio::{runtime::Builder, task};
|
||||
use tokio::runtime::Builder;
|
||||
|
||||
fn test_ctx<Factory>(factory: Factory) -> ProofTaskCtx<Factory> {
|
||||
ProofTaskCtx::new(factory)
|
||||
@@ -1598,21 +1627,16 @@ mod tests {
|
||||
#[test]
|
||||
fn spawn_proof_workers_creates_handle() {
|
||||
let runtime = Builder::new_multi_thread().worker_threads(1).enable_all().build().unwrap();
|
||||
runtime.block_on(async {
|
||||
let handle = tokio::runtime::Handle::current();
|
||||
let provider_factory = create_test_provider_factory();
|
||||
let factory =
|
||||
reth_provider::providers::OverlayStateProviderFactory::new(provider_factory);
|
||||
let ctx = test_ctx(factory);
|
||||
let provider_factory = create_test_provider_factory();
|
||||
let factory = reth_provider::providers::OverlayStateProviderFactory::new(provider_factory);
|
||||
let ctx = test_ctx(factory);
|
||||
|
||||
let proof_handle = ProofWorkerHandle::new(handle.clone(), ctx, 5, 3);
|
||||
let proof_handle = ProofWorkerHandle::new(runtime.handle().clone(), ctx, 5, 3);
|
||||
|
||||
// Verify handle can be cloned
|
||||
let _cloned_handle = proof_handle.clone();
|
||||
// Verify handle can be cloned
|
||||
let _cloned_handle = proof_handle.clone();
|
||||
|
||||
// Workers shut down automatically when handle is dropped
|
||||
drop(proof_handle);
|
||||
task::yield_now().await;
|
||||
});
|
||||
// Workers shut down automatically when handle is dropped
|
||||
drop(proof_handle);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user