perf: spawn proof workers in parallel

Similar to #20575 for prewarm workers, this moves proof worker spawning
into a single spawn_blocking task. Previously, ProofWorkerHandle::new()
would call spawn_blocking once per worker (~50 calls on main thread).

Now we spawn a single task that spawns all workers inside it, reducing
main thread spawn_blocking calls from ~50 to 1. The handle returns
immediately with channels ready - workers process jobs as they come online.
This commit is contained in:
yongkangc
2025-12-23 00:57:50 +00:00
parent 3b8acd4b07
commit 6b141525b9

View File

@@ -69,7 +69,7 @@ use std::{
time::{Duration, Instant},
};
use tokio::runtime::Handle;
use tracing::{debug, debug_span, error, trace};
use tracing::{debug, debug_span, error, trace, Span};
#[cfg(feature = "metrics")]
use crate::proof_task_metrics::{
@@ -108,6 +108,10 @@ impl ProofWorkerHandle {
/// Returns a handle for submitting proof tasks to the worker pools.
/// Workers run until the last handle is dropped.
///
/// Worker spawning is done in a separate blocking task to avoid blocking the main thread
/// on multiple `spawn_blocking` calls. The handle is returned immediately with channels
/// ready to receive work - workers will process jobs as they come online.
///
/// # Parameters
/// - `executor`: Tokio runtime handle for spawning blocking tasks
/// - `task_ctx`: Shared context with database view and prefix sets
@@ -140,85 +144,106 @@ impl ProofWorkerHandle {
"Spawning proof worker pools"
);
let parent_span =
debug_span!(target: "trie::proof_task", "storage proof workers", ?storage_worker_count)
.entered();
// Spawn storage workers
for worker_id in 0..storage_worker_count {
let span = debug_span!(target: "trie::proof_task", "storage worker", ?worker_id);
let task_ctx_clone = task_ctx.clone();
let work_rx_clone = storage_work_rx.clone();
let storage_available_workers_clone = storage_available_workers.clone();
// Clone what we need for the spawning task
let storage_available_workers_spawn = storage_available_workers.clone();
let account_available_workers_spawn = account_available_workers.clone();
let storage_work_tx_spawn = storage_work_tx.clone();
let parent_span = Span::current();
// Clone the executor handle for use inside the spawned task.
let executor_spawn = executor.clone();
executor.spawn_blocking(move || {
#[cfg(feature = "metrics")]
let metrics = ProofTaskTrieMetrics::default();
#[cfg(feature = "metrics")]
let cursor_metrics = ProofTaskCursorMetrics::new();
// Spawn a single task that spawns all workers in parallel.
// This moves the spawn_blocking overhead off the main thread.
executor.spawn_blocking(move || {
let executor = executor_spawn;
let _enter = parent_span.entered();
let _guard = span.enter();
let worker = StorageProofWorker::new(
task_ctx_clone,
work_rx_clone,
worker_id,
storage_available_workers_clone,
let storage_parent_span = debug_span!(
target: "trie::proof_task",
"storage proof workers",
?storage_worker_count
)
.entered();
// Spawn storage workers
for worker_id in 0..storage_worker_count {
let span = debug_span!(target: "trie::proof_task", "storage worker", ?worker_id);
let task_ctx_clone = task_ctx.clone();
let work_rx_clone = storage_work_rx.clone();
let storage_available_workers_clone = storage_available_workers_spawn.clone();
executor.spawn_blocking(move || {
#[cfg(feature = "metrics")]
metrics,
let metrics = ProofTaskTrieMetrics::default();
#[cfg(feature = "metrics")]
cursor_metrics,
);
if let Err(error) = worker.run() {
error!(
target: "trie::proof_task",
let cursor_metrics = ProofTaskCursorMetrics::new();
let _guard = span.enter();
let worker = StorageProofWorker::new(
task_ctx_clone,
work_rx_clone,
worker_id,
?error,
"Storage worker failed"
storage_available_workers_clone,
#[cfg(feature = "metrics")]
metrics,
#[cfg(feature = "metrics")]
cursor_metrics,
);
}
});
}
drop(parent_span);
if let Err(error) = worker.run() {
error!(
target: "trie::proof_task",
worker_id,
?error,
"Storage worker failed"
);
}
});
}
drop(storage_parent_span);
let parent_span =
debug_span!(target: "trie::proof_task", "account proof workers", ?account_worker_count)
.entered();
// Spawn account workers
for worker_id in 0..account_worker_count {
let span = debug_span!(target: "trie::proof_task", "account worker", ?worker_id);
let task_ctx_clone = task_ctx.clone();
let work_rx_clone = account_work_rx.clone();
let storage_work_tx_clone = storage_work_tx.clone();
let account_available_workers_clone = account_available_workers.clone();
let account_parent_span = debug_span!(
target: "trie::proof_task",
"account proof workers",
?account_worker_count
)
.entered();
// Spawn account workers
for worker_id in 0..account_worker_count {
let span = debug_span!(target: "trie::proof_task", "account worker", ?worker_id);
let task_ctx_clone = task_ctx.clone();
let work_rx_clone = account_work_rx.clone();
let storage_work_tx_clone = storage_work_tx_spawn.clone();
let account_available_workers_clone = account_available_workers_spawn.clone();
executor.spawn_blocking(move || {
#[cfg(feature = "metrics")]
let metrics = ProofTaskTrieMetrics::default();
#[cfg(feature = "metrics")]
let cursor_metrics = ProofTaskCursorMetrics::new();
let _guard = span.enter();
let worker = AccountProofWorker::new(
task_ctx_clone,
work_rx_clone,
worker_id,
storage_work_tx_clone,
account_available_workers_clone,
executor.spawn_blocking(move || {
#[cfg(feature = "metrics")]
metrics,
let metrics = ProofTaskTrieMetrics::default();
#[cfg(feature = "metrics")]
cursor_metrics,
);
if let Err(error) = worker.run() {
error!(
target: "trie::proof_task",
let cursor_metrics = ProofTaskCursorMetrics::new();
let _guard = span.enter();
let worker = AccountProofWorker::new(
task_ctx_clone,
work_rx_clone,
worker_id,
?error,
"Account worker failed"
storage_work_tx_clone,
account_available_workers_clone,
#[cfg(feature = "metrics")]
metrics,
#[cfg(feature = "metrics")]
cursor_metrics,
);
}
});
}
drop(parent_span);
if let Err(error) = worker.run() {
error!(
target: "trie::proof_task",
worker_id,
?error,
"Account worker failed"
);
}
});
}
drop(account_parent_span);
});
Self {
storage_work_tx,