perf: use dedicated trie rayon pool for proof workers (#22051)

This commit is contained in:
DaniPopes
2026-02-11 23:10:17 +01:00
committed by GitHub
parent 04543ed16b
commit ff3a854326
9 changed files with 194 additions and 181 deletions

1
Cargo.lock generated
View File

@@ -8659,6 +8659,7 @@ dependencies = [
"reth-node-ethereum",
"reth-node-metrics",
"reth-rpc-server-types",
"reth-tasks",
"reth-tracing",
"tempfile",
"tracing",

View File

@@ -30,8 +30,14 @@ impl CliRunner {
///
/// The default runtime is multi-threaded, with both I/O and time drivers enabled.
pub fn try_default_runtime() -> Result<Self, reth_tasks::RuntimeBuildError> {
let runtime =
reth_tasks::RuntimeBuilder::new(reth_tasks::RuntimeConfig::default()).build()?;
Self::try_with_runtime_config(reth_tasks::RuntimeConfig::default())
}
/// Creates a new [`CliRunner`] with the given [`RuntimeConfig`](reth_tasks::RuntimeConfig).
pub fn try_with_runtime_config(
config: reth_tasks::RuntimeConfig,
) -> Result<Self, reth_tasks::RuntimeBuildError> {
let runtime = reth_tasks::RuntimeBuilder::new(config).build()?;
Ok(Self { config: CliRunnerConfig::default(), runtime })
}

View File

@@ -287,15 +287,7 @@ where
// Create and spawn the storage proof task
let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
let storage_worker_count = config.storage_worker_count();
let account_worker_count = config.account_worker_count();
let proof_handle = ProofWorkerHandle::new(
&self.executor,
task_ctx,
storage_worker_count,
account_worker_count,
v2_proofs_enabled,
);
let proof_handle = ProofWorkerHandle::new(&self.executor, task_ctx, v2_proofs_enabled);
if config.disable_trie_cache() {
let multi_proof_task = MultiProofTask::new(

View File

@@ -1571,7 +1571,7 @@ mod tests {
let changeset_cache = ChangesetCache::new();
let overlay_factory = OverlayStateProviderFactory::new(factory, changeset_cache);
let task_ctx = ProofTaskCtx::new(overlay_factory);
let proof_handle = ProofWorkerHandle::new(runtime, task_ctx, 1, 1, false);
let proof_handle = ProofWorkerHandle::new(runtime, task_ctx, false);
let (to_sparse_trie, _receiver) = std::sync::mpsc::channel();
let (tx, rx) = crossbeam_channel::unbounded();

View File

@@ -22,6 +22,7 @@ reth-node-core.workspace = true
reth-node-ethereum.workspace = true
reth-node-metrics.workspace = true
reth-rpc-server-types.workspace = true
reth-tasks.workspace = true
reth-tracing.workspace = true
reth-node-api.workspace = true

View File

@@ -17,6 +17,7 @@ use reth_node_builder::{NodeBuilder, WithLaunchContext};
use reth_node_ethereum::{consensus::EthBeaconConsensus, EthEvmConfig, EthereumNode};
use reth_node_metrics::recorder::install_prometheus_recorder;
use reth_rpc_server_types::RpcModuleValidator;
use reth_tasks::RayonConfig;
use reth_tracing::{FileWorkerGuard, Layers};
use std::{fmt, sync::Arc};
@@ -153,6 +154,16 @@ where
Rpc::validate_selection(ws_api, "ws.api").map_err(|e| eyre!("{e}"))?;
}
let rayon_config = RayonConfig {
reserved_cpu_cores: command.engine.reserved_cpu_cores,
proof_storage_worker_threads: command.engine.storage_worker_count,
proof_account_worker_threads: command.engine.account_worker_count,
..Default::default()
};
let runner = CliRunner::try_with_runtime_config(
reth_tasks::RuntimeConfig::default().with_rayon(rayon_config),
)?;
runner.run_command_until_exit(|ctx| {
command.execute(ctx, FnLauncher::new::<C, Ext>(launcher))
})

View File

@@ -99,14 +99,17 @@ pub struct RayonConfig {
/// Number of threads for the RPC blocking pool (trace calls, `eth_getProof`, etc.).
/// If `None`, uses the same as `cpu_threads`.
pub rpc_threads: Option<usize>,
/// Number of threads for the trie proof computation pool.
/// If `None`, uses the same as `cpu_threads`.
pub trie_threads: Option<usize>,
/// Number of threads for the storage I/O pool (static file, `RocksDB` writes in
/// `save_blocks`). If `None`, uses [`DEFAULT_STORAGE_POOL_THREADS`].
pub storage_threads: Option<usize>,
/// Maximum number of concurrent blocking tasks for the RPC guard semaphore.
pub max_blocking_tasks: usize,
/// Number of threads for the proof storage worker pool (trie storage proof workers).
/// If `None`, derived from available parallelism.
pub proof_storage_worker_threads: Option<usize>,
/// Number of threads for the proof account worker pool (trie account proof workers).
/// If `None`, derived from available parallelism.
pub proof_account_worker_threads: Option<usize>,
}
#[cfg(feature = "rayon")]
@@ -116,9 +119,10 @@ impl Default for RayonConfig {
cpu_threads: None,
reserved_cpu_cores: DEFAULT_RESERVED_CPU_CORES,
rpc_threads: None,
trie_threads: None,
storage_threads: None,
max_blocking_tasks: DEFAULT_MAX_BLOCKING_TASKS,
proof_storage_worker_threads: None,
proof_account_worker_threads: None,
}
}
}
@@ -143,18 +147,30 @@ impl RayonConfig {
self
}
/// Set the number of threads for the trie proof pool.
pub const fn with_trie_threads(mut self, trie_threads: usize) -> Self {
self.trie_threads = Some(trie_threads);
self
}
/// Set the number of threads for the storage I/O pool.
pub const fn with_storage_threads(mut self, storage_threads: usize) -> Self {
self.storage_threads = Some(storage_threads);
self
}
/// Set the number of threads for the proof storage worker pool.
pub const fn with_proof_storage_worker_threads(
mut self,
proof_storage_worker_threads: usize,
) -> Self {
self.proof_storage_worker_threads = Some(proof_storage_worker_threads);
self
}
/// Set the number of threads for the proof account worker pool.
pub const fn with_proof_account_worker_threads(
mut self,
proof_account_worker_threads: usize,
) -> Self {
self.proof_account_worker_threads = Some(proof_account_worker_threads);
self
}
/// Compute the default number of threads based on available parallelism.
fn default_thread_count(&self) -> usize {
self.cpu_threads.unwrap_or_else(|| {
@@ -232,15 +248,18 @@ struct RuntimeInner {
/// RPC blocking pool.
#[cfg(feature = "rayon")]
rpc_pool: BlockingTaskPool,
/// Trie proof computation pool.
#[cfg(feature = "rayon")]
trie_pool: rayon::ThreadPool,
/// Storage I/O pool.
#[cfg(feature = "rayon")]
storage_pool: rayon::ThreadPool,
/// Rate limiter for expensive RPC operations.
#[cfg(feature = "rayon")]
blocking_guard: BlockingTaskGuard,
/// Proof storage worker pool (trie storage proof computation).
#[cfg(feature = "rayon")]
proof_storage_worker_pool: rayon::ThreadPool,
/// Proof account worker pool (trie account proof computation).
#[cfg(feature = "rayon")]
proof_account_worker_pool: rayon::ThreadPool,
/// Handle to the spawned [`TaskManager`] background task.
/// The task monitors critical tasks for panics and fires the shutdown signal.
/// Can be taken via [`Runtime::take_task_manager_handle`] to poll for panic errors.
@@ -313,12 +332,6 @@ impl Runtime {
&self.0.rpc_pool
}
/// Get the trie proof computation pool.
#[cfg(feature = "rayon")]
pub fn trie_pool(&self) -> &rayon::ThreadPool {
&self.0.trie_pool
}
/// Get the storage I/O pool.
#[cfg(feature = "rayon")]
pub fn storage_pool(&self) -> &rayon::ThreadPool {
@@ -331,34 +344,16 @@ impl Runtime {
self.0.blocking_guard.clone()
}
/// Run a closure on the CPU pool, blocking the current thread until completion.
/// Get the proof storage worker pool.
#[cfg(feature = "rayon")]
pub fn install_cpu<F, R>(&self, f: F) -> R
where
F: FnOnce() -> R + Send,
R: Send,
{
self.cpu_pool().install(f)
pub fn proof_storage_worker_pool(&self) -> &rayon::ThreadPool {
&self.0.proof_storage_worker_pool
}
/// Spawn a CPU-bound task on the RPC pool and return an async handle.
/// Get the proof account worker pool.
#[cfg(feature = "rayon")]
pub fn spawn_rpc<F, R>(&self, f: F) -> crate::pool::BlockingTaskHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
self.rpc_pool().spawn(f)
}
/// Run a closure on the trie pool, blocking the current thread until completion.
#[cfg(feature = "rayon")]
pub fn install_trie<F, R>(&self, f: F) -> R
where
F: FnOnce() -> R + Send,
R: Send,
{
self.trie_pool().install(f)
pub fn proof_account_worker_pool(&self) -> &rayon::ThreadPool {
&self.0.proof_account_worker_pool
}
}
@@ -395,9 +390,10 @@ impl Runtime {
cpu_threads: Some(2),
reserved_cpu_cores: 0,
rpc_threads: Some(2),
trie_threads: Some(2),
storage_threads: Some(2),
max_blocking_tasks: 16,
proof_storage_worker_threads: Some(2),
proof_account_worker_threads: Some(2),
},
}
}
@@ -821,46 +817,69 @@ impl RuntimeBuilder {
TaskManager::new_parts(handle.clone());
#[cfg(feature = "rayon")]
let (cpu_pool, rpc_pool, trie_pool, storage_pool, blocking_guard) = {
let (
cpu_pool,
rpc_pool,
storage_pool,
blocking_guard,
proof_storage_worker_pool,
proof_account_worker_pool,
) = {
let default_threads = config.rayon.default_thread_count();
let rpc_threads = config.rayon.rpc_threads.unwrap_or(default_threads);
let trie_threads = config.rayon.trie_threads.unwrap_or(default_threads);
let cpu_pool = rayon::ThreadPoolBuilder::new()
.num_threads(default_threads)
.thread_name(|i| format!("reth-cpu-{i}"))
.thread_name(|i| format!("cpu-{i}"))
.build()?;
let rpc_raw = rayon::ThreadPoolBuilder::new()
.num_threads(rpc_threads)
.thread_name(|i| format!("reth-rpc-{i}"))
.thread_name(|i| format!("rpc-{i}"))
.build()?;
let rpc_pool = BlockingTaskPool::new(rpc_raw);
let trie_pool = rayon::ThreadPoolBuilder::new()
.num_threads(trie_threads)
.thread_name(|i| format!("reth-trie-{i}"))
.build()?;
let storage_threads =
config.rayon.storage_threads.unwrap_or(DEFAULT_STORAGE_POOL_THREADS);
let storage_pool = rayon::ThreadPoolBuilder::new()
.num_threads(storage_threads)
.thread_name(|i| format!("reth-storage-{i}"))
.thread_name(|i| format!("storage-{i}"))
.build()?;
let blocking_guard = BlockingTaskGuard::new(config.rayon.max_blocking_tasks);
let proof_storage_worker_threads =
config.rayon.proof_storage_worker_threads.unwrap_or(default_threads);
let proof_storage_worker_pool = rayon::ThreadPoolBuilder::new()
.num_threads(proof_storage_worker_threads)
.thread_name(|i| format!("proof-strg-{i}"))
.build()?;
let proof_account_worker_threads =
config.rayon.proof_account_worker_threads.unwrap_or(default_threads);
let proof_account_worker_pool = rayon::ThreadPoolBuilder::new()
.num_threads(proof_account_worker_threads)
.thread_name(|i| format!("proof-acct-{i}"))
.build()?;
debug!(
default_threads,
rpc_threads,
trie_threads,
storage_threads,
proof_storage_worker_threads,
proof_account_worker_threads,
max_blocking_tasks = config.rayon.max_blocking_tasks,
"Initialized rayon thread pools"
);
(cpu_pool, rpc_pool, trie_pool, storage_pool, blocking_guard)
(
cpu_pool,
rpc_pool,
storage_pool,
blocking_guard,
proof_storage_worker_pool,
proof_account_worker_pool,
)
};
let task_manager_handle = handle.spawn(async move {
@@ -883,11 +902,13 @@ impl RuntimeBuilder {
#[cfg(feature = "rayon")]
rpc_pool,
#[cfg(feature = "rayon")]
trie_pool,
#[cfg(feature = "rayon")]
storage_pool,
#[cfg(feature = "rayon")]
blocking_guard,
#[cfg(feature = "rayon")]
proof_storage_worker_pool,
#[cfg(feature = "rayon")]
proof_account_worker_pool,
task_manager_handle: Mutex::new(Some(task_manager_handle)),
};

View File

@@ -325,7 +325,7 @@ mod tests {
reth_provider::providers::OverlayStateProviderFactory::new(factory, changeset_cache);
let task_ctx = ProofTaskCtx::new(factory);
let runtime = reth_tasks::Runtime::test();
let proof_worker_handle = ProofWorkerHandle::new(&runtime, task_ctx, 1, 1, false);
let proof_worker_handle = ProofWorkerHandle::new(&runtime, task_ctx, false);
let parallel_result = ParallelProof::new(Default::default(), proof_worker_handle.clone())
.decoded_multiproof(targets.clone())

View File

@@ -134,14 +134,10 @@ impl ProofWorkerHandle {
/// # Parameters
/// - `runtime`: The centralized runtime used to spawn blocking worker tasks
/// - `task_ctx`: Shared context with database view and prefix sets
/// - `storage_worker_count`: Number of storage workers to spawn
/// - `account_worker_count`: Number of account workers to spawn
/// - `v2_proofs_enabled`: Whether to enable V2 storage proofs
pub fn new<Factory>(
runtime: &Runtime,
task_ctx: ProofTaskCtx<Factory>,
storage_worker_count: usize,
account_worker_count: usize,
v2_proofs_enabled: bool,
) -> Self
where
@@ -153,13 +149,14 @@ impl ProofWorkerHandle {
let (storage_work_tx, storage_work_rx) = unbounded::<StorageWorkerJob>();
let (account_work_tx, account_work_rx) = unbounded::<AccountWorkerJob>();
// Initialize availability counters at zero. Each worker will increment when it
// successfully initializes, ensuring only healthy workers are counted.
let storage_available_workers = Arc::<AtomicUsize>::default();
let account_available_workers = Arc::<AtomicUsize>::default();
let cached_storage_roots = Arc::<DashMap<_, _>>::default();
let storage_worker_count = runtime.proof_storage_worker_pool().current_num_threads();
let account_worker_count = runtime.proof_account_worker_pool().current_num_threads();
debug!(
target: "trie::proof_task",
storage_worker_count,
@@ -168,113 +165,97 @@ impl ProofWorkerHandle {
"Spawning proof worker pools"
);
let this = Self {
storage_work_tx: storage_work_tx.clone(),
let storage_pool = runtime.proof_storage_worker_pool();
let task_ctx_for_storage = task_ctx.clone();
let cached_storage_roots_for_storage = cached_storage_roots.clone();
for worker_id in 0..storage_worker_count {
let span = debug_span!(target: "trie::proof_task", "storage worker", ?worker_id);
let task_ctx_clone = task_ctx_for_storage.clone();
let work_rx_clone = storage_work_rx.clone();
let storage_available_workers_clone = storage_available_workers.clone();
let cached_storage_roots = cached_storage_roots_for_storage.clone();
storage_pool.spawn(move || {
#[cfg(feature = "metrics")]
let metrics = ProofTaskTrieMetrics::default();
#[cfg(feature = "metrics")]
let cursor_metrics = ProofTaskCursorMetrics::new();
let _guard = span.enter();
let worker = StorageProofWorker::new(
task_ctx_clone,
work_rx_clone,
worker_id,
storage_available_workers_clone,
cached_storage_roots,
#[cfg(feature = "metrics")]
metrics,
#[cfg(feature = "metrics")]
cursor_metrics,
)
.with_v2_proofs(v2_proofs_enabled);
if let Err(error) = worker.run() {
error!(
target: "trie::proof_task",
worker_id,
?error,
"Storage worker failed"
);
}
});
}
let account_pool = runtime.proof_account_worker_pool();
for worker_id in 0..account_worker_count {
let span = debug_span!(target: "trie::proof_task", "account worker", ?worker_id);
let task_ctx_clone = task_ctx.clone();
let work_rx_clone = account_work_rx.clone();
let storage_work_tx_clone = storage_work_tx.clone();
let account_available_workers_clone = account_available_workers.clone();
let cached_storage_roots = cached_storage_roots.clone();
account_pool.spawn(move || {
#[cfg(feature = "metrics")]
let metrics = ProofTaskTrieMetrics::default();
#[cfg(feature = "metrics")]
let cursor_metrics = ProofTaskCursorMetrics::new();
let _guard = span.enter();
let worker = AccountProofWorker::new(
task_ctx_clone,
work_rx_clone,
worker_id,
storage_work_tx_clone,
account_available_workers_clone,
cached_storage_roots,
#[cfg(feature = "metrics")]
metrics,
#[cfg(feature = "metrics")]
cursor_metrics,
)
.with_v2_proofs(v2_proofs_enabled);
if let Err(error) = worker.run() {
error!(
target: "trie::proof_task",
worker_id,
?error,
"Account worker failed"
);
}
});
}
Self {
storage_work_tx,
account_work_tx,
storage_available_workers: storage_available_workers.clone(),
account_available_workers: account_available_workers.clone(),
storage_available_workers,
account_available_workers,
storage_worker_count,
account_worker_count,
v2_proofs_enabled,
};
let executor = runtime.handle().clone();
let task_ctx_for_storage = task_ctx.clone();
let executor_for_storage = executor.clone();
let cached_storage_roots_for_storage = cached_storage_roots.clone();
executor_for_storage.clone().spawn_blocking(move || {
let parent_span =
debug_span!(target: "trie::proof_task", "storage proof workers", ?storage_worker_count)
.entered();
// Spawn storage workers
for worker_id in 0..storage_worker_count {
let span = debug_span!(target: "trie::proof_task", "storage worker", ?worker_id);
let task_ctx_clone = task_ctx_for_storage.clone();
let work_rx_clone = storage_work_rx.clone();
let storage_available_workers_clone = storage_available_workers.clone();
let cached_storage_roots = cached_storage_roots_for_storage.clone();
executor_for_storage.spawn_blocking(move || {
#[cfg(feature = "metrics")]
let metrics = ProofTaskTrieMetrics::default();
#[cfg(feature = "metrics")]
let cursor_metrics = ProofTaskCursorMetrics::new();
let _guard = span.enter();
let worker = StorageProofWorker::new(
task_ctx_clone,
work_rx_clone,
worker_id,
storage_available_workers_clone,
cached_storage_roots,
#[cfg(feature = "metrics")]
metrics,
#[cfg(feature = "metrics")]
cursor_metrics,
)
.with_v2_proofs(v2_proofs_enabled);
if let Err(error) = worker.run() {
error!(
target: "trie::proof_task",
worker_id,
?error,
"Storage worker failed"
);
}
});
}
drop(parent_span);
});
executor.clone().spawn_blocking(move || {
let parent_span =
debug_span!(target: "trie::proof_task", "account proof workers", ?account_worker_count)
.entered();
// Spawn account workers
for worker_id in 0..account_worker_count {
let span = debug_span!(target: "trie::proof_task", "account worker", ?worker_id);
let task_ctx_clone = task_ctx.clone();
let work_rx_clone = account_work_rx.clone();
let storage_work_tx_clone = storage_work_tx.clone();
let account_available_workers_clone = account_available_workers.clone();
let cached_storage_roots = cached_storage_roots.clone();
executor.spawn_blocking(move || {
#[cfg(feature = "metrics")]
let metrics = ProofTaskTrieMetrics::default();
#[cfg(feature = "metrics")]
let cursor_metrics = ProofTaskCursorMetrics::new();
let _guard = span.enter();
let worker = AccountProofWorker::new(
task_ctx_clone,
work_rx_clone,
worker_id,
storage_work_tx_clone,
account_available_workers_clone,
cached_storage_roots,
#[cfg(feature = "metrics")]
metrics,
#[cfg(feature = "metrics")]
cursor_metrics,
)
.with_v2_proofs(v2_proofs_enabled);
if let Err(error) = worker.run() {
error!(
target: "trie::proof_task",
worker_id,
?error,
"Account worker failed"
);
}
});
}
drop(parent_span);
});
this
}
}
/// Returns whether V2 storage proofs are enabled for this worker pool.
@@ -2039,7 +2020,7 @@ mod tests {
let ctx = test_ctx(factory);
let runtime = reth_tasks::Runtime::test();
let proof_handle = ProofWorkerHandle::new(&runtime, ctx, 5, 3, false);
let proof_handle = ProofWorkerHandle::new(&runtime, ctx, false);
// Verify handle can be cloned
let _cloned_handle = proof_handle.clone();