diff --git a/crates/trie/parallel/src/proof_task.rs b/crates/trie/parallel/src/proof_task.rs index 9c91f08da5..818f165d02 100644 --- a/crates/trie/parallel/src/proof_task.rs +++ b/crates/trie/parallel/src/proof_task.rs @@ -168,101 +168,112 @@ impl ProofWorkerHandle { "Spawning proof worker pools" ); - let parent_span = - debug_span!(target: "trie::proof_task", "storage proof workers", ?storage_worker_count) - .entered(); - // Spawn storage workers - for worker_id in 0..storage_worker_count { - let span = debug_span!(target: "trie::proof_task", "storage worker", ?worker_id); - let task_ctx_clone = task_ctx.clone(); - let work_rx_clone = storage_work_rx.clone(); - let storage_available_workers_clone = storage_available_workers.clone(); - let cached_storage_roots = cached_storage_roots.clone(); - - executor.spawn_blocking(move || { - #[cfg(feature = "metrics")] - let metrics = ProofTaskTrieMetrics::default(); - #[cfg(feature = "metrics")] - let cursor_metrics = ProofTaskCursorMetrics::new(); - - let _guard = span.enter(); - let worker = StorageProofWorker::new( - task_ctx_clone, - work_rx_clone, - worker_id, - storage_available_workers_clone, - cached_storage_roots, - #[cfg(feature = "metrics")] - metrics, - #[cfg(feature = "metrics")] - cursor_metrics, - ) - .with_v2_proofs(v2_proofs_enabled); - if let Err(error) = worker.run() { - error!( - target: "trie::proof_task", - worker_id, - ?error, - "Storage worker failed" - ); - } - }); - } - drop(parent_span); - - let parent_span = - debug_span!(target: "trie::proof_task", "account proof workers", ?account_worker_count) - .entered(); - // Spawn account workers - for worker_id in 0..account_worker_count { - let span = debug_span!(target: "trie::proof_task", "account worker", ?worker_id); - let task_ctx_clone = task_ctx.clone(); - let work_rx_clone = account_work_rx.clone(); - let storage_work_tx_clone = storage_work_tx.clone(); - let account_available_workers_clone = account_available_workers.clone(); - let cached_storage_roots = cached_storage_roots.clone(); - - executor.spawn_blocking(move || { - #[cfg(feature = "metrics")] - let metrics = ProofTaskTrieMetrics::default(); - #[cfg(feature = "metrics")] - let cursor_metrics = ProofTaskCursorMetrics::new(); - - let _guard = span.enter(); - let worker = AccountProofWorker::new( - task_ctx_clone, - work_rx_clone, - worker_id, - storage_work_tx_clone, - account_available_workers_clone, - cached_storage_roots, - #[cfg(feature = "metrics")] - metrics, - #[cfg(feature = "metrics")] - cursor_metrics, - ) - .with_v2_proofs(v2_proofs_enabled); - if let Err(error) = worker.run() { - error!( - target: "trie::proof_task", - worker_id, - ?error, - "Account worker failed" - ); - } - }); - } - drop(parent_span); - - Self { - storage_work_tx, + let this = Self { + storage_work_tx: storage_work_tx.clone(), account_work_tx, - storage_available_workers, - account_available_workers, + storage_available_workers: storage_available_workers.clone(), + account_available_workers: account_available_workers.clone(), storage_worker_count, account_worker_count, v2_proofs_enabled, - } + }; + + // Clone for the first spawn_blocking (storage workers) + let task_ctx_for_storage = task_ctx.clone(); + let executor_for_storage = executor.clone(); + let cached_storage_roots_for_storage = cached_storage_roots.clone(); + + executor_for_storage.clone().spawn_blocking(move || { + let parent_span = + debug_span!(target: "trie::proof_task", "storage proof workers", ?storage_worker_count) + .entered(); + // Spawn storage workers + for worker_id in 0..storage_worker_count { + let span = debug_span!(target: "trie::proof_task", "storage worker", ?worker_id); + let task_ctx_clone = task_ctx_for_storage.clone(); + let work_rx_clone = storage_work_rx.clone(); + let storage_available_workers_clone = storage_available_workers.clone(); + let cached_storage_roots = cached_storage_roots_for_storage.clone(); + + executor_for_storage.spawn_blocking(move || { + #[cfg(feature = "metrics")] + let metrics = ProofTaskTrieMetrics::default(); + #[cfg(feature = "metrics")] + let cursor_metrics = ProofTaskCursorMetrics::new(); + + let _guard = span.enter(); + let worker = StorageProofWorker::new( + task_ctx_clone, + work_rx_clone, + worker_id, + storage_available_workers_clone, + cached_storage_roots, + #[cfg(feature = "metrics")] + metrics, + #[cfg(feature = "metrics")] + cursor_metrics, + ) + .with_v2_proofs(v2_proofs_enabled); + if let Err(error) = worker.run() { + error!( + target: "trie::proof_task", + worker_id, + ?error, + "Storage worker failed" + ); + } + }); + } + drop(parent_span); + }); + + executor.clone().spawn_blocking(move || { + let parent_span = + debug_span!(target: "trie::proof_task", "account proof workers", ?account_worker_count) + .entered(); + // Spawn account workers + for worker_id in 0..account_worker_count { + let span = debug_span!(target: "trie::proof_task", "account worker", ?worker_id); + let task_ctx_clone = task_ctx.clone(); + let work_rx_clone = account_work_rx.clone(); + let storage_work_tx_clone = storage_work_tx.clone(); + let account_available_workers_clone = account_available_workers.clone(); + let cached_storage_roots = cached_storage_roots.clone(); + + executor.spawn_blocking(move || { + #[cfg(feature = "metrics")] + let metrics = ProofTaskTrieMetrics::default(); + #[cfg(feature = "metrics")] + let cursor_metrics = ProofTaskCursorMetrics::new(); + + let _guard = span.enter(); + let worker = AccountProofWorker::new( + task_ctx_clone, + work_rx_clone, + worker_id, + storage_work_tx_clone, + account_available_workers_clone, + cached_storage_roots, + #[cfg(feature = "metrics")] + metrics, + #[cfg(feature = "metrics")] + cursor_metrics, + ) + .with_v2_proofs(v2_proofs_enabled); + if let Err(error) = worker.run() { + error!( + target: "trie::proof_task", + worker_id, + ?error, + "Account worker failed" + ); + } + }); + } + drop(parent_span); + }); + + this } /// Returns whether V2 storage proofs are enabled for this worker pool.