mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-08 03:01:12 -04:00
perf: spawn prewarm workers in parallel (#20575)
This commit is contained in:
@@ -164,12 +164,7 @@ where
|
||||
};
|
||||
|
||||
// Initialize worker handles container
|
||||
let mut handles = Vec::with_capacity(workers_needed);
|
||||
|
||||
// Only spawn initial workers as needed
|
||||
for i in 0..workers_needed {
|
||||
handles.push(ctx.spawn_worker(i, &executor, actions_tx.clone(), done_tx.clone()));
|
||||
}
|
||||
let handles = ctx.clone().spawn_workers(workers_needed, &executor, actions_tx.clone(), done_tx.clone());
|
||||
|
||||
// Distribute transactions to workers
|
||||
let mut tx_index = 0usize;
|
||||
@@ -536,27 +531,43 @@ where
|
||||
}
|
||||
|
||||
/// Spawns a worker task for transaction execution and returns its sender channel.
|
||||
fn spawn_worker<Tx>(
|
||||
&self,
|
||||
idx: usize,
|
||||
executor: &WorkloadExecutor,
|
||||
fn spawn_workers<Tx>(
|
||||
self,
|
||||
workers_needed: usize,
|
||||
task_executor: &WorkloadExecutor,
|
||||
actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
|
||||
done_tx: Sender<()>,
|
||||
) -> mpsc::Sender<IndexedTransaction<Tx>>
|
||||
) -> Vec<mpsc::Sender<IndexedTransaction<Tx>>>
|
||||
where
|
||||
Tx: ExecutableTxFor<Evm> + Send + 'static,
|
||||
{
|
||||
let (tx, rx) = mpsc::channel();
|
||||
let ctx = self.clone();
|
||||
let span =
|
||||
debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm worker", idx);
|
||||
let mut handles = Vec::with_capacity(workers_needed);
|
||||
let mut receivers = Vec::with_capacity(workers_needed);
|
||||
|
||||
executor.spawn_blocking(move || {
|
||||
for _ in 0..workers_needed {
|
||||
let (tx, rx) = mpsc::channel();
|
||||
handles.push(tx);
|
||||
receivers.push(rx);
|
||||
}
|
||||
|
||||
// Spawn a separate task spawning workers in parallel.
|
||||
let executor = task_executor.clone();
|
||||
let span = Span::current();
|
||||
task_executor.spawn_blocking(move || {
|
||||
let _enter = span.entered();
|
||||
ctx.transact_batch(rx, actions_tx, done_tx);
|
||||
for (idx, rx) in receivers.into_iter().enumerate() {
|
||||
let ctx = self.clone();
|
||||
let actions_tx = actions_tx.clone();
|
||||
let done_tx = done_tx.clone();
|
||||
let span = debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm worker", idx);
|
||||
executor.spawn_blocking(move || {
|
||||
let _enter = span.entered();
|
||||
ctx.transact_batch(rx, actions_tx, done_tx);
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
tx
|
||||
handles
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user