perf: bound more channels with known upper limits (#22206)

Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
Georgios Konstantopoulos
2026-02-15 18:05:43 -08:00
committed by GitHub
parent 47cef33a0d
commit ec982f8686
4 changed files with 15 additions and 17 deletions

View File

@@ -42,7 +42,7 @@ use std::{
ops::Range,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{self, channel, Receiver, Sender},
mpsc::{self, channel, Receiver, Sender, SyncSender},
Arc,
},
time::Instant,
@@ -154,8 +154,6 @@ where
self.executor.spawn_blocking(move || {
let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", parent: span, "spawn_all").entered();
let (done_tx, done_rx) = mpsc::channel();
// When transaction_count is 0, it means the count is unknown. In this case, spawn
// max workers to handle potentially many transactions in parallel rather
// than bottlenecking on a single worker.
@@ -166,6 +164,8 @@ where
transaction_count.min(max_concurrency)
};
let (done_tx, done_rx) = mpsc::sync_channel(workers_needed);
// Spawn workers
let tx_sender = ctx.clone().spawn_workers(workers_needed, &executor, to_multi_proof.clone(), done_tx.clone());
@@ -312,11 +312,11 @@ where
return;
}
let (done_tx, done_rx) = mpsc::channel();
// Calculate number of workers needed (at most max_concurrency)
let workers_needed = total_slots.min(self.max_concurrency);
let (done_tx, done_rx) = mpsc::sync_channel(workers_needed);
// Calculate slots per worker
let slots_per_worker = total_slots / workers_needed;
let remainder = total_slots % workers_needed;
@@ -585,7 +585,7 @@ where
self,
txs: CrossbeamReceiver<IndexedTransaction<Tx>>,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
done_tx: Sender<()>,
done_tx: SyncSender<()>,
) where
Tx: ExecutableTxFor<Evm>,
{
@@ -660,7 +660,7 @@ where
workers_needed: usize,
task_executor: &Runtime,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
done_tx: Sender<()>,
done_tx: SyncSender<()>,
) -> CrossbeamSender<IndexedTransaction<Tx>>
where
Tx: ExecutableTxFor<Evm> + Send + 'static,
@@ -698,7 +698,7 @@ where
executor: &Runtime,
bal: Arc<BlockAccessList>,
range: Range<usize>,
done_tx: Sender<()>,
done_tx: SyncSender<()>,
) {
let ctx = self.clone();
let span = debug_span!(
@@ -724,7 +724,7 @@ where
self,
bal: Arc<BlockAccessList>,
range: Range<usize>,
done_tx: Sender<()>,
done_tx: SyncSender<()>,
) {
let Self { saved_cache, provider, metrics, .. } = self;

View File

@@ -183,10 +183,9 @@ where
// channels used to return result of account hashing
for chunk in &accounts_cursor.walk(None)?.chunks(WORKER_CHUNK_SIZE) {
// An _unordered_ channel to receive results from a rayon job
let (tx, rx) = mpsc::channel();
channels.push(rx);
let chunk = chunk.collect::<Result<Vec<_>, _>>()?;
let (tx, rx) = mpsc::sync_channel(chunk.len());
channels.push(rx);
// Spawn the hashing task onto the global rayon pool
rayon::spawn(move || {
for (address, account) in chunk {

View File

@@ -110,10 +110,9 @@ where
for chunk in &storage_cursor.walk(None)?.chunks(WORKER_CHUNK_SIZE) {
// An _unordered_ channel to receive results from a rayon job
let (tx, rx) = mpsc::channel();
channels.push(rx);
let chunk = chunk.collect::<Result<Vec<_>, _>>()?;
let (tx, rx) = mpsc::sync_channel(chunk.len());
channels.push(rx);
// Spawn the hashing task onto the global rayon pool
rayon::spawn(move || {
// Cache hashed address since PlainStorageState is sorted by address

View File

@@ -34,7 +34,7 @@ const BATCH_SIZE: usize = 100_000;
const WORKER_CHUNK_SIZE: usize = 100;
/// Type alias for a sender that transmits the result of sender recovery.
type RecoveryResultSender = mpsc::Sender<Result<(u64, Address), Box<SenderRecoveryStageError>>>;
type RecoveryResultSender = mpsc::SyncSender<Result<(u64, Address), Box<SenderRecoveryStageError>>>;
/// The sender recovery stage iterates over existing transactions,
/// recovers the transaction signer and stores them
@@ -245,7 +245,7 @@ where
.step_by(WORKER_CHUNK_SIZE)
.map(|start| {
let range = start..std::cmp::min(start + WORKER_CHUNK_SIZE as u64, tx_range.end);
let (tx, rx) = mpsc::channel();
let (tx, rx) = mpsc::sync_channel((range.end - range.start) as usize);
// Range and channel sender will be sent to rayon worker
((range, tx), rx)
})