perf: use shared channel for prewarm workers (#21429)

This commit is contained in:
Arsenii Kulikov
2026-01-26 19:49:44 +04:00
committed by GitHub
parent b87cde5479
commit 768a687189

View File

@@ -26,11 +26,11 @@ use alloy_consensus::transaction::TxHashRef;
use alloy_eip7928::BlockAccessList;
use alloy_evm::Database;
use alloy_primitives::{keccak256, map::B256Set, B256};
use crossbeam_channel::Sender as CrossbeamSender;
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use metrics::{Counter, Gauge, Histogram};
use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, RecoveredTx, SpecFor};
use reth_metrics::Metrics;
use reth_primitives_traits::{NodePrimitives, SignedTransaction};
use reth_primitives_traits::NodePrimitives;
use reth_provider::{
AccountReader, BlockExecutionOutput, BlockReader, StateProvider, StateProviderFactory,
StateReader,
@@ -163,8 +163,8 @@ where
transaction_count_hint.min(max_concurrency)
};
// Initialize worker handles container
let handles = ctx.clone().spawn_workers(workers_needed, &executor, actions_tx.clone(), done_tx.clone());
// Spawn workers
let tx_sender = ctx.clone().spawn_workers(workers_needed, &executor, actions_tx.clone(), done_tx.clone());
// Distribute transactions to workers
let mut tx_index = 0usize;
@@ -179,37 +179,18 @@ where
}
let indexed_tx = IndexedTransaction { index: tx_index, tx };
let is_system_tx = indexed_tx.tx.tx().is_system_tx();
// System transactions (type > 4) in the first position set critical metadata
// that affects all subsequent transactions (e.g., L1 block info on L2s).
// Broadcast the first system transaction to all workers to ensure they have
// the critical state. This is particularly important for L2s like Optimism
// where the first deposit transaction (type 126) contains essential block metadata.
if tx_index == 0 && is_system_tx {
for handle in &handles {
// Ignore send errors: workers listen to terminate_execution and may
// exit early when signaled. Sending to a disconnected worker is
// possible and harmless and should happen at most once due to
// the terminate_execution check above.
let _ = handle.send(indexed_tx.clone());
}
} else {
// Round-robin distribution for all other transactions
let worker_idx = tx_index % workers_needed;
// Ignore send errors: workers listen to terminate_execution and may
// exit early when signaled. Sending to a disconnected worker is
// possible and harmless and should happen at most once due to
// the terminate_execution check above.
let _ = handles[worker_idx].send(indexed_tx);
}
// Send transaction to the workers
// Ignore send errors: workers listen to terminate_execution and may
// exit early when signaled.
let _ = tx_sender.send(indexed_tx);
tx_index += 1;
}
// drop handle and wait for all tasks to finish and drop theirs
// drop sender and wait for all tasks to finish
drop(done_tx);
drop(handles);
drop(tx_sender);
while done_rx.recv().is_ok() {}
let _ = actions_tx
@@ -548,7 +529,7 @@ where
Some((evm, metrics, terminate_execution, v2_proofs_enabled))
}
/// Accepts an [`mpsc::Receiver`] of transactions and a handle to prewarm task. Executes
/// Accepts a [`CrossbeamReceiver`] of transactions and a handle to prewarm task. Executes
/// transactions and streams [`PrewarmTaskEvent::Outcome`] messages for each transaction.
///
/// This function processes transactions sequentially from the receiver and emits outcome events
@@ -560,7 +541,7 @@ where
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
fn transact_batch<Tx>(
self,
txs: mpsc::Receiver<IndexedTransaction<Tx>>,
txs: CrossbeamReceiver<IndexedTransaction<Tx>>,
sender: Sender<PrewarmTaskEvent<N::Receipt>>,
done_tx: Sender<()>,
) where
@@ -647,35 +628,31 @@ where
let _ = done_tx.send(());
}
/// Spawns a worker task for transaction execution and returns its sender channel.
/// Spawns worker tasks that pull transactions from a shared channel.
///
/// Returns the sender for distributing transactions to workers.
fn spawn_workers<Tx>(
self,
workers_needed: usize,
task_executor: &WorkloadExecutor,
actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
done_tx: Sender<()>,
) -> Vec<mpsc::Sender<IndexedTransaction<Tx>>>
) -> CrossbeamSender<IndexedTransaction<Tx>>
where
Tx: ExecutableTxFor<Evm> + Send + 'static,
{
let mut handles = Vec::with_capacity(workers_needed);
let mut receivers = Vec::with_capacity(workers_needed);
let (tx_sender, tx_receiver) = crossbeam_channel::unbounded();
for _ in 0..workers_needed {
let (tx, rx) = mpsc::channel();
handles.push(tx);
receivers.push(rx);
}
// Spawn a separate task spawning workers in parallel.
// Spawn workers that all pull from the shared receiver
let executor = task_executor.clone();
let span = Span::current();
task_executor.spawn_blocking(move || {
let _enter = span.entered();
for (idx, rx) in receivers.into_iter().enumerate() {
for idx in 0..workers_needed {
let ctx = self.clone();
let actions_tx = actions_tx.clone();
let done_tx = done_tx.clone();
let rx = tx_receiver.clone();
let span = debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm worker", idx);
executor.spawn_blocking(move || {
let _enter = span.entered();
@@ -684,7 +661,7 @@ where
}
});
handles
tx_sender
}
/// Spawns a worker task for BAL slot prefetching.