using unbounded queue

This commit is contained in:
Yong Kang
2025-10-07 10:48:12 +00:00
parent e1be0f9cc3
commit ee0eb79da9

View File

@@ -10,7 +10,7 @@
use crate::root::ParallelStateRootError;
use alloy_primitives::{map::B256Set, B256};
use crossbeam_channel::{bounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use reth_db_api::transaction::DbTx;
use reth_execution_errors::{SparseTrieError, SparseTrieErrorKind};
use reth_provider::{
@@ -72,8 +72,9 @@ struct StorageProofJob {
///
/// 1. **Storage Worker Pool**:
/// - Pre-spawned workers with dedicated long-lived transactions
/// - Tasks queued via crossbeam bounded channel
/// - Tasks queued via crossbeam unbounded channel
/// - Workers continuously process without transaction overhead
/// - Unbounded queue ensures all storage proofs benefit from transaction reuse
///
/// 2. **On-Demand Execution**:
/// - Lazy transaction creation for blinded node fetches
@@ -253,17 +254,15 @@ where
);
}
// Use 4x buffering to prevent queue saturation under burst load.
// Deeper queue reduces fallback to slower on-demand execution when workers are busy.
let queue_capacity = planned_workers.saturating_mul(4).max(1);
let (storage_work_tx, storage_work_rx) = bounded::<StorageProofJob>(queue_capacity);
// Use unbounded channel to ensure all storage proofs are queued to workers.
// This maintains transaction reuse benefits and avoids fallback to on-demand execution.
let (storage_work_tx, storage_work_rx) = unbounded::<StorageProofJob>();
tracing::info!(
target: "trie::proof_task",
storage_worker_count = planned_workers,
queue_capacity,
max_concurrency,
"Initializing storage proof worker pool"
"Initializing storage proof worker pool with unbounded queue"
);
let mut spawned_workers = 0;
@@ -402,8 +401,8 @@ where
///
/// # Task Routing
///
/// - **Storage Proofs**: Routed to pre-spawned worker pool via bounded channel. Falls back to
/// on-demand spawn if channel is full or disconnected.
/// - **Storage Proofs**: Routed to pre-spawned worker pool via unbounded channel. Only falls
/// back to on-demand if workers are disconnected (e.g., all workers panicked).
/// - **Blinded Nodes**: Queued for on-demand execution.
///
/// # Shutdown
@@ -419,7 +418,7 @@ where
ProofTaskKind::StorageProof(input, sender) => {
match self
.storage_work_tx
.try_send(StorageProofJob { input, result_sender: sender })
.send(StorageProofJob { input, result_sender: sender })
{
Ok(_) => {
tracing::trace!(
@@ -427,20 +426,7 @@ where
"Storage proof dispatched to worker pool"
);
}
Err(crossbeam_channel::TrySendError::Full(job)) => {
tracing::debug!(
target: "trie::proof_task",
"Worker pool queue full, spawning on-demand"
);
self.on_demand_queue.push_back(
ProofTaskKind::StorageProof(
job.input,
job.result_sender,
),
);
}
Err(crossbeam_channel::TrySendError::Disconnected(job)) => {
Err(crossbeam_channel::SendError(job)) => {
tracing::warn!(
target: "trie::proof_task",
storage_worker_count = self.storage_worker_count,