perf: spawn multiproof workers in parallel

This commit is contained in:
Arsenii Kulikov
2025-12-23 03:00:59 +04:00
parent eec76a3faf
commit 261839bdce

View File

@@ -114,7 +114,7 @@ impl ProofWorkerHandle {
/// - `storage_worker_count`: Number of storage workers to spawn
/// - `account_worker_count`: Number of account workers to spawn
pub fn new<Factory>(
executor: Handle,
task_executor: Handle,
task_ctx: ProofTaskCtx<Factory>,
storage_worker_count: usize,
account_worker_count: usize,
@@ -140,94 +140,99 @@ impl ProofWorkerHandle {
"Spawning proof worker pools"
);
let parent_span =
debug_span!(target: "trie::proof_task", "storage proof workers", ?storage_worker_count)
.entered();
// Spawn storage workers
for worker_id in 0..storage_worker_count {
let span = debug_span!(target: "trie::proof_task", "storage worker", ?worker_id);
let task_ctx_clone = task_ctx.clone();
let work_rx_clone = storage_work_rx.clone();
let storage_available_workers_clone = storage_available_workers.clone();
executor.spawn_blocking(move || {
#[cfg(feature = "metrics")]
let metrics = ProofTaskTrieMetrics::default();
#[cfg(feature = "metrics")]
let cursor_metrics = ProofTaskCursorMetrics::new();
let _guard = span.enter();
let worker = StorageProofWorker::new(
task_ctx_clone,
work_rx_clone,
worker_id,
storage_available_workers_clone,
#[cfg(feature = "metrics")]
metrics,
#[cfg(feature = "metrics")]
cursor_metrics,
);
if let Err(error) = worker.run() {
error!(
target: "trie::proof_task",
worker_id,
?error,
"Storage worker failed"
);
}
});
}
drop(parent_span);
let parent_span =
debug_span!(target: "trie::proof_task", "account proof workers", ?account_worker_count)
.entered();
// Spawn account workers
for worker_id in 0..account_worker_count {
let span = debug_span!(target: "trie::proof_task", "account worker", ?worker_id);
let task_ctx_clone = task_ctx.clone();
let work_rx_clone = account_work_rx.clone();
let storage_work_tx_clone = storage_work_tx.clone();
let account_available_workers_clone = account_available_workers.clone();
executor.spawn_blocking(move || {
#[cfg(feature = "metrics")]
let metrics = ProofTaskTrieMetrics::default();
#[cfg(feature = "metrics")]
let cursor_metrics = ProofTaskCursorMetrics::new();
let _guard = span.enter();
let worker = AccountProofWorker::new(
task_ctx_clone,
work_rx_clone,
worker_id,
storage_work_tx_clone,
account_available_workers_clone,
#[cfg(feature = "metrics")]
metrics,
#[cfg(feature = "metrics")]
cursor_metrics,
);
if let Err(error) = worker.run() {
error!(
target: "trie::proof_task",
worker_id,
?error,
"Account worker failed"
);
}
});
}
drop(parent_span);
Self {
storage_work_tx,
let this = Self {
storage_work_tx: storage_work_tx.clone(),
account_work_tx,
storage_available_workers,
account_available_workers,
storage_available_workers: storage_available_workers.clone(),
account_available_workers: account_available_workers.clone(),
storage_worker_count,
account_worker_count,
}
};
let executor = task_executor.clone();
task_executor.spawn_blocking(move || {
let parent_span =
debug_span!(target: "trie::proof_task", "storage proof workers", ?storage_worker_count)
.entered();
// Spawn storage workers
for worker_id in 0..storage_worker_count {
let span = debug_span!(target: "trie::proof_task", "storage worker", ?worker_id);
let task_ctx_clone = task_ctx.clone();
let work_rx_clone = storage_work_rx.clone();
let storage_available_workers_clone = storage_available_workers.clone();
executor.spawn_blocking(move || {
#[cfg(feature = "metrics")]
let metrics = ProofTaskTrieMetrics::default();
#[cfg(feature = "metrics")]
let cursor_metrics = ProofTaskCursorMetrics::new();
let _guard = span.enter();
let worker = StorageProofWorker::new(
task_ctx_clone,
work_rx_clone,
worker_id,
storage_available_workers_clone,
#[cfg(feature = "metrics")]
metrics,
#[cfg(feature = "metrics")]
cursor_metrics,
);
if let Err(error) = worker.run() {
error!(
target: "trie::proof_task",
worker_id,
?error,
"Storage worker failed"
);
}
});
}
drop(parent_span);
let parent_span =
debug_span!(target: "trie::proof_task", "account proof workers", ?account_worker_count)
.entered();
// Spawn account workers
for worker_id in 0..account_worker_count {
let span = debug_span!(target: "trie::proof_task", "account worker", ?worker_id);
let task_ctx_clone = task_ctx.clone();
let work_rx_clone = account_work_rx.clone();
let storage_work_tx_clone = storage_work_tx.clone();
let account_available_workers_clone = account_available_workers.clone();
executor.spawn_blocking(move || {
#[cfg(feature = "metrics")]
let metrics = ProofTaskTrieMetrics::default();
#[cfg(feature = "metrics")]
let cursor_metrics = ProofTaskCursorMetrics::new();
let _guard = span.enter();
let worker = AccountProofWorker::new(
task_ctx_clone,
work_rx_clone,
worker_id,
storage_work_tx_clone,
account_available_workers_clone,
#[cfg(feature = "metrics")]
metrics,
#[cfg(feature = "metrics")]
cursor_metrics,
);
if let Err(error) = worker.run() {
error!(
target: "trie::proof_task",
worker_id,
?error,
"Account worker failed"
);
}
});
}
drop(parent_span);
});
this
}
/// Returns how many storage workers are currently available/idle.