From ee0eb79da9dfd499a44e3b233bb6f40f708398ec Mon Sep 17 00:00:00 2001 From: Yong Kang Date: Tue, 7 Oct 2025 10:48:12 +0000 Subject: [PATCH] using unbounded queue --- crates/trie/parallel/src/proof_task.rs | 36 ++++++++------------------ 1 file changed, 11 insertions(+), 25 deletions(-) diff --git a/crates/trie/parallel/src/proof_task.rs b/crates/trie/parallel/src/proof_task.rs index 81b2fcfeb4..0c01542952 100644 --- a/crates/trie/parallel/src/proof_task.rs +++ b/crates/trie/parallel/src/proof_task.rs @@ -10,7 +10,7 @@ use crate::root::ParallelStateRootError; use alloy_primitives::{map::B256Set, B256}; -use crossbeam_channel::{bounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender}; +use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender}; use reth_db_api::transaction::DbTx; use reth_execution_errors::{SparseTrieError, SparseTrieErrorKind}; use reth_provider::{ @@ -72,8 +72,9 @@ struct StorageProofJob { /// /// 1. **Storage Worker Pool**: /// - Pre-spawned workers with dedicated long-lived transactions -/// - Tasks queued via crossbeam bounded channel +/// - Tasks queued via crossbeam unbounded channel /// - Workers continuously process without transaction overhead +/// - Unbounded queue ensures all storage proofs benefit from transaction reuse /// /// 2. **On-Demand Execution**: /// - Lazy transaction creation for blinded node fetches @@ -253,17 +254,15 @@ where ); } - // Use 4x buffering to prevent queue saturation under burst load. - // Deeper queue reduces fallback to slower on-demand execution when workers are busy. - let queue_capacity = planned_workers.saturating_mul(4).max(1); - let (storage_work_tx, storage_work_rx) = bounded::(queue_capacity); + // Use unbounded channel to ensure all storage proofs are queued to workers. + // This maintains transaction reuse benefits and avoids fallback to on-demand execution. + let (storage_work_tx, storage_work_rx) = unbounded::(); tracing::info!( target: "trie::proof_task", storage_worker_count = planned_workers, - queue_capacity, max_concurrency, - "Initializing storage proof worker pool" + "Initializing storage proof worker pool with unbounded queue" ); let mut spawned_workers = 0; @@ -402,8 +401,8 @@ where /// /// # Task Routing /// - /// - **Storage Proofs**: Routed to pre-spawned worker pool via bounded channel. Falls back to - /// on-demand spawn if channel is full or disconnected. + /// - **Storage Proofs**: Routed to pre-spawned worker pool via unbounded channel. Only falls + /// back to on-demand if workers are disconnected (e.g., all workers panicked). /// - **Blinded Nodes**: Queued for on-demand execution. /// /// # Shutdown @@ -419,7 +418,7 @@ where ProofTaskKind::StorageProof(input, sender) => { match self .storage_work_tx - .try_send(StorageProofJob { input, result_sender: sender }) + .send(StorageProofJob { input, result_sender: sender }) { Ok(_) => { tracing::trace!( @@ -427,20 +426,7 @@ where "Storage proof dispatched to worker pool" ); } - Err(crossbeam_channel::TrySendError::Full(job)) => { - tracing::debug!( - target: "trie::proof_task", - "Worker pool queue full, spawning on-demand" - ); - - self.on_demand_queue.push_back( - ProofTaskKind::StorageProof( - job.input, - job.result_sender, - ), - ); - } - Err(crossbeam_channel::TrySendError::Disconnected(job)) => { + Err(crossbeam_channel::SendError(job)) => { tracing::warn!( target: "trie::proof_task", storage_worker_count = self.storage_worker_count,