From 261839bdce856e6c9b0041bc2e8947d289fd17c5 Mon Sep 17 00:00:00 2001 From: Arsenii Kulikov Date: Tue, 23 Dec 2025 03:00:59 +0400 Subject: [PATCH] perf: spawn multiproof workers in parallel --- crates/trie/parallel/src/proof_task.rs | 177 +++++++++++++------------ 1 file changed, 91 insertions(+), 86 deletions(-) diff --git a/crates/trie/parallel/src/proof_task.rs b/crates/trie/parallel/src/proof_task.rs index 58dc99fc37..371a272f05 100644 --- a/crates/trie/parallel/src/proof_task.rs +++ b/crates/trie/parallel/src/proof_task.rs @@ -114,7 +114,7 @@ impl ProofWorkerHandle { /// - `storage_worker_count`: Number of storage workers to spawn /// - `account_worker_count`: Number of account workers to spawn pub fn new( - executor: Handle, + task_executor: Handle, task_ctx: ProofTaskCtx, storage_worker_count: usize, account_worker_count: usize, @@ -140,94 +140,99 @@ impl ProofWorkerHandle { "Spawning proof worker pools" ); - let parent_span = - debug_span!(target: "trie::proof_task", "storage proof workers", ?storage_worker_count) - .entered(); - // Spawn storage workers - for worker_id in 0..storage_worker_count { - let span = debug_span!(target: "trie::proof_task", "storage worker", ?worker_id); - let task_ctx_clone = task_ctx.clone(); - let work_rx_clone = storage_work_rx.clone(); - let storage_available_workers_clone = storage_available_workers.clone(); - - executor.spawn_blocking(move || { - #[cfg(feature = "metrics")] - let metrics = ProofTaskTrieMetrics::default(); - #[cfg(feature = "metrics")] - let cursor_metrics = ProofTaskCursorMetrics::new(); - - let _guard = span.enter(); - let worker = StorageProofWorker::new( - task_ctx_clone, - work_rx_clone, - worker_id, - storage_available_workers_clone, - #[cfg(feature = "metrics")] - metrics, - #[cfg(feature = "metrics")] - cursor_metrics, - ); - if let Err(error) = worker.run() { - error!( - target: "trie::proof_task", - worker_id, - ?error, - "Storage worker failed" - ); - } - }); - } - drop(parent_span); - - let parent_span = - debug_span!(target: "trie::proof_task", "account proof workers", ?account_worker_count) - .entered(); - // Spawn account workers - for worker_id in 0..account_worker_count { - let span = debug_span!(target: "trie::proof_task", "account worker", ?worker_id); - let task_ctx_clone = task_ctx.clone(); - let work_rx_clone = account_work_rx.clone(); - let storage_work_tx_clone = storage_work_tx.clone(); - let account_available_workers_clone = account_available_workers.clone(); - - executor.spawn_blocking(move || { - #[cfg(feature = "metrics")] - let metrics = ProofTaskTrieMetrics::default(); - #[cfg(feature = "metrics")] - let cursor_metrics = ProofTaskCursorMetrics::new(); - - let _guard = span.enter(); - let worker = AccountProofWorker::new( - task_ctx_clone, - work_rx_clone, - worker_id, - storage_work_tx_clone, - account_available_workers_clone, - #[cfg(feature = "metrics")] - metrics, - #[cfg(feature = "metrics")] - cursor_metrics, - ); - if let Err(error) = worker.run() { - error!( - target: "trie::proof_task", - worker_id, - ?error, - "Account worker failed" - ); - } - }); - } - drop(parent_span); - - Self { - storage_work_tx, + let this = Self { + storage_work_tx: storage_work_tx.clone(), account_work_tx, - storage_available_workers, - account_available_workers, + storage_available_workers: storage_available_workers.clone(), + account_available_workers: account_available_workers.clone(), storage_worker_count, account_worker_count, - } + }; + + let executor = task_executor.clone(); + task_executor.spawn_blocking(move || { + let parent_span = + debug_span!(target: "trie::proof_task", "storage proof workers", ?storage_worker_count) + .entered(); + // Spawn storage workers + for worker_id in 0..storage_worker_count { + let span = debug_span!(target: "trie::proof_task", "storage worker", ?worker_id); + let task_ctx_clone = task_ctx.clone(); + let work_rx_clone = storage_work_rx.clone(); + let storage_available_workers_clone = storage_available_workers.clone(); + + executor.spawn_blocking(move || { + #[cfg(feature = "metrics")] + let metrics = ProofTaskTrieMetrics::default(); + #[cfg(feature = "metrics")] + let cursor_metrics = ProofTaskCursorMetrics::new(); + + let _guard = span.enter(); + let worker = StorageProofWorker::new( + task_ctx_clone, + work_rx_clone, + worker_id, + storage_available_workers_clone, + #[cfg(feature = "metrics")] + metrics, + #[cfg(feature = "metrics")] + cursor_metrics, + ); + if let Err(error) = worker.run() { + error!( + target: "trie::proof_task", + worker_id, + ?error, + "Storage worker failed" + ); + } + }); + } + drop(parent_span); + + let parent_span = + debug_span!(target: "trie::proof_task", "account proof workers", ?account_worker_count) + .entered(); + // Spawn account workers + for worker_id in 0..account_worker_count { + let span = debug_span!(target: "trie::proof_task", "account worker", ?worker_id); + let task_ctx_clone = task_ctx.clone(); + let work_rx_clone = account_work_rx.clone(); + let storage_work_tx_clone = storage_work_tx.clone(); + let account_available_workers_clone = account_available_workers.clone(); + + executor.spawn_blocking(move || { + #[cfg(feature = "metrics")] + let metrics = ProofTaskTrieMetrics::default(); + #[cfg(feature = "metrics")] + let cursor_metrics = ProofTaskCursorMetrics::new(); + + let _guard = span.enter(); + let worker = AccountProofWorker::new( + task_ctx_clone, + work_rx_clone, + worker_id, + storage_work_tx_clone, + account_available_workers_clone, + #[cfg(feature = "metrics")] + metrics, + #[cfg(feature = "metrics")] + cursor_metrics, + ); + if let Err(error) = worker.run() { + error!( + target: "trie::proof_task", + worker_id, + ?error, + "Account worker failed" + ); + } + }); + } + drop(parent_span); + }); + + this } /// Returns how many storage workers are currently available/idle.