From ec982f8686ae382fe36fc677d3337b33f45f24c1 Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Sun, 15 Feb 2026 18:05:43 -0800 Subject: [PATCH] perf: bound more channels with known upper limits (#22206) Co-authored-by: Amp --- .../tree/src/tree/payload_processor/prewarm.rs | 18 +++++++++--------- .../stages/src/stages/hashing_account.rs | 5 ++--- .../stages/src/stages/hashing_storage.rs | 5 ++--- .../stages/src/stages/sender_recovery.rs | 4 ++-- 4 files changed, 15 insertions(+), 17 deletions(-) diff --git a/crates/engine/tree/src/tree/payload_processor/prewarm.rs b/crates/engine/tree/src/tree/payload_processor/prewarm.rs index 3711d719e7..2b45acad2f 100644 --- a/crates/engine/tree/src/tree/payload_processor/prewarm.rs +++ b/crates/engine/tree/src/tree/payload_processor/prewarm.rs @@ -42,7 +42,7 @@ use std::{ ops::Range, sync::{ atomic::{AtomicBool, Ordering}, - mpsc::{self, channel, Receiver, Sender}, + mpsc::{self, channel, Receiver, Sender, SyncSender}, Arc, }, time::Instant, @@ -154,8 +154,6 @@ where self.executor.spawn_blocking(move || { let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", parent: span, "spawn_all").entered(); - let (done_tx, done_rx) = mpsc::channel(); - // When transaction_count is 0, it means the count is unknown. In this case, spawn // max workers to handle potentially many transactions in parallel rather // than bottlenecking on a single worker. @@ -166,6 +164,8 @@ where transaction_count.min(max_concurrency) }; + let (done_tx, done_rx) = mpsc::sync_channel(workers_needed); + // Spawn workers let tx_sender = ctx.clone().spawn_workers(workers_needed, &executor, to_multi_proof.clone(), done_tx.clone()); @@ -312,11 +312,11 @@ where return; } - let (done_tx, done_rx) = mpsc::channel(); - // Calculate number of workers needed (at most max_concurrency) let workers_needed = total_slots.min(self.max_concurrency); + let (done_tx, done_rx) = mpsc::sync_channel(workers_needed); + // Calculate slots per worker let slots_per_worker = total_slots / workers_needed; let remainder = total_slots % workers_needed; @@ -585,7 +585,7 @@ where self, txs: CrossbeamReceiver>, to_multi_proof: Option>, - done_tx: Sender<()>, + done_tx: SyncSender<()>, ) where Tx: ExecutableTxFor, { @@ -660,7 +660,7 @@ where workers_needed: usize, task_executor: &Runtime, to_multi_proof: Option>, - done_tx: Sender<()>, + done_tx: SyncSender<()>, ) -> CrossbeamSender> where Tx: ExecutableTxFor + Send + 'static, @@ -698,7 +698,7 @@ where executor: &Runtime, bal: Arc, range: Range, - done_tx: Sender<()>, + done_tx: SyncSender<()>, ) { let ctx = self.clone(); let span = debug_span!( @@ -724,7 +724,7 @@ where self, bal: Arc, range: Range, - done_tx: Sender<()>, + done_tx: SyncSender<()>, ) { let Self { saved_cache, provider, metrics, .. } = self; diff --git a/crates/stages/stages/src/stages/hashing_account.rs b/crates/stages/stages/src/stages/hashing_account.rs index 8eceb9f364..72cd5c3217 100644 --- a/crates/stages/stages/src/stages/hashing_account.rs +++ b/crates/stages/stages/src/stages/hashing_account.rs @@ -183,10 +183,9 @@ where // channels used to return result of account hashing for chunk in &accounts_cursor.walk(None)?.chunks(WORKER_CHUNK_SIZE) { // An _unordered_ channel to receive results from a rayon job - let (tx, rx) = mpsc::channel(); - channels.push(rx); - let chunk = chunk.collect::, _>>()?; + let (tx, rx) = mpsc::sync_channel(chunk.len()); + channels.push(rx); // Spawn the hashing task onto the global rayon pool rayon::spawn(move || { for (address, account) in chunk { diff --git a/crates/stages/stages/src/stages/hashing_storage.rs b/crates/stages/stages/src/stages/hashing_storage.rs index 6edc47fbea..c3252eff20 100644 --- a/crates/stages/stages/src/stages/hashing_storage.rs +++ b/crates/stages/stages/src/stages/hashing_storage.rs @@ -110,10 +110,9 @@ where for chunk in &storage_cursor.walk(None)?.chunks(WORKER_CHUNK_SIZE) { // An _unordered_ channel to receive results from a rayon job - let (tx, rx) = mpsc::channel(); - channels.push(rx); - let chunk = chunk.collect::, _>>()?; + let (tx, rx) = mpsc::sync_channel(chunk.len()); + channels.push(rx); // Spawn the hashing task onto the global rayon pool rayon::spawn(move || { // Cache hashed address since PlainStorageState is sorted by address diff --git a/crates/stages/stages/src/stages/sender_recovery.rs b/crates/stages/stages/src/stages/sender_recovery.rs index 2bb4ca6587..475917f673 100644 --- a/crates/stages/stages/src/stages/sender_recovery.rs +++ b/crates/stages/stages/src/stages/sender_recovery.rs @@ -34,7 +34,7 @@ const BATCH_SIZE: usize = 100_000; const WORKER_CHUNK_SIZE: usize = 100; /// Type alias for a sender that transmits the result of sender recovery. -type RecoveryResultSender = mpsc::Sender>>; +type RecoveryResultSender = mpsc::SyncSender>>; /// The sender recovery stage iterates over existing transactions, /// recovers the transaction signer and stores them @@ -245,7 +245,7 @@ where .step_by(WORKER_CHUNK_SIZE) .map(|start| { let range = start..std::cmp::min(start + WORKER_CHUNK_SIZE as u64, tx_range.end); - let (tx, rx) = mpsc::channel(); + let (tx, rx) = mpsc::sync_channel((range.end - range.start) as usize); // Range and channel sender will be sent to rayon worker ((range, tx), rx) })