From 1383c151c9b9e866830d3826d3c6cb9ec11be506 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Fri, 6 Feb 2026 16:17:15 +0000 Subject: [PATCH] perf(engine): send proofs directly from prewarming to multiproof task (#21901) --- .../src/tree/payload_processor/prewarm.rs | 55 +++++-------------- 1 file changed, 14 insertions(+), 41 deletions(-) diff --git a/crates/engine/tree/src/tree/payload_processor/prewarm.rs b/crates/engine/tree/src/tree/payload_processor/prewarm.rs index 4aa95c3c3b..dad00c57db 100644 --- a/crates/engine/tree/src/tree/payload_processor/prewarm.rs +++ b/crates/engine/tree/src/tree/payload_processor/prewarm.rs @@ -138,6 +138,7 @@ where &self, pending: mpsc::Receiver, actions_tx: Sender>, + to_multi_proof: Option>, ) where Tx: ExecutableTxFor + Clone + Send + 'static, { @@ -162,7 +163,7 @@ where }; // Spawn workers - let tx_sender = ctx.clone().spawn_workers(workers_needed, &executor, actions_tx.clone(), done_tx.clone()); + let tx_sender = ctx.clone().spawn_workers(workers_needed, &executor, to_multi_proof, done_tx.clone()); // Distribute transactions to workers let mut tx_index = 0usize; @@ -196,24 +197,6 @@ where }); } - /// Returns true if prewarming was terminated and no more transactions should be prewarmed. - fn is_execution_terminated(&self) -> bool { - self.ctx.terminate_execution.load(Ordering::Relaxed) - } - - /// If configured and the tx returned proof targets, emit the targets the transaction produced - fn send_multi_proof_targets(&self, targets: Option) { - if self.is_execution_terminated() { - // if execution is already terminated then we dont need to send more proof fetch - // messages - return - } - - if let Some((proof_targets, to_multi_proof)) = targets.zip(self.to_multi_proof.as_ref()) { - let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(proof_targets)); - } - } - /// This method calls `ExecutionCache::update_with_guard` which requires exclusive access. /// It should only be called after ensuring that: /// 1. All prewarming tasks have completed execution @@ -375,7 +358,7 @@ where // Spawn execution tasks based on mode match mode { PrewarmMode::Transactions(pending) => { - self.spawn_all(pending, actions_tx); + self.spawn_all(pending, actions_tx, self.to_multi_proof.clone()); } PrewarmMode::BlockAccessList(bal) => { self.run_bal_prewarm(bal, actions_tx); @@ -391,10 +374,6 @@ where debug!(target: "engine::tree::prewarm", "Terminating prewarm execution"); self.ctx.terminate_execution.store(true, Ordering::Relaxed); } - PrewarmTaskEvent::Outcome { proof_targets } => { - // completed executing a set of transactions - self.send_multi_proof_targets(proof_targets); - } PrewarmTaskEvent::Terminate { execution_outcome, valid_block_rx } => { trace!(target: "engine::tree::payload_processor::prewarm", "Received termination signal"); final_execution_outcome = @@ -531,7 +510,8 @@ where } /// Accepts a [`CrossbeamReceiver`] of transactions and a handle to prewarm task. Executes - /// transactions and streams [`PrewarmTaskEvent::Outcome`] messages for each transaction. + /// transactions and streams [`MultiProofMessage::PrefetchProofs`] messages for each + /// transaction. /// /// This function processes transactions sequentially from the receiver and emits outcome events /// via the provided sender. Execution errors are logged and tracked but do not stop the batch @@ -543,7 +523,7 @@ where fn transact_batch( self, txs: CrossbeamReceiver>, - sender: Sender>, + to_multi_proof: Option>, done_tx: Sender<()>, ) where Tx: ExecutableTxFor, @@ -571,10 +551,8 @@ where // create the tx env let start = Instant::now(); - // If the task was cancelled, stop execution, send an empty result to notify the task, - // and exit. + // If the task was cancelled, stop execution, and exit. if terminate_execution.load(Ordering::Relaxed) { - let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: None }); break } @@ -603,10 +581,8 @@ where drop(enter); - // If the task was cancelled, stop execution, send an empty result to notify the task, - // and exit. + // If the task was cancelled, stop execution, and exit. if terminate_execution.load(Ordering::Relaxed) { - let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: None }); break } @@ -619,7 +595,9 @@ where let (targets, storage_targets) = multiproof_targets_from_state(res.state, v2_proofs_enabled); metrics.prefetch_storage_targets.record(storage_targets as f64); - let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: Some(targets) }); + if let Some(to_multi_proof) = &to_multi_proof { + let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(targets)); + } drop(_enter); } @@ -637,7 +615,7 @@ where self, workers_needed: usize, task_executor: &WorkloadExecutor, - actions_tx: Sender>, + to_multi_proof: Option>, done_tx: Sender<()>, ) -> CrossbeamSender> where @@ -652,13 +630,13 @@ where let _enter = span.entered(); for idx in 0..workers_needed { let ctx = self.clone(); - let actions_tx = actions_tx.clone(); + let to_multi_proof = to_multi_proof.clone(); let done_tx = done_tx.clone(); let rx = tx_receiver.clone(); let span = debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm worker", idx); executor.spawn_blocking(move || { let _enter = span.entered(); - ctx.transact_batch(rx, actions_tx, done_tx); + ctx.transact_batch(rx, to_multi_proof, done_tx); }); } }); @@ -869,11 +847,6 @@ pub enum PrewarmTaskEvent { /// updated cache but only save it once we know the block is valid. valid_block_rx: mpsc::Receiver<()>, }, - /// The outcome of a pre-warm task - Outcome { - /// The prepared proof targets based on the evm state outcome - proof_targets: Option, - }, /// Finished executing all transactions FinishedTxExecution { /// Number of transactions executed