diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index 7392411705..f2147ce93b 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -668,10 +668,12 @@ impl PayloadHandle { /// If the [`BlockExecutionOutput`] is provided it will update the shared cache using its /// bundle state. Using `Arc` allows sharing with the main execution /// path without cloning the expensive `BundleState`. + /// + /// Returns a sender for the channel that should be notified on block validation success. pub(super) fn terminate_caching( &mut self, execution_outcome: Option>>, - ) { + ) -> Option> { self.prewarm_handle.terminate_caching(execution_outcome) } @@ -709,13 +711,19 @@ impl CacheTaskHandle { /// /// If the [`BlockExecutionOutput`] is provided it will update the shared cache using its /// bundle state. Using `Arc` avoids cloning the expensive `BundleState`. + #[must_use = "sender must be used and notified on block validation success"] pub(super) fn terminate_caching( &mut self, execution_outcome: Option>>, - ) { + ) -> Option> { if let Some(tx) = self.to_prewarm_task.take() { - let event = PrewarmTaskEvent::Terminate { execution_outcome }; + let (valid_block_tx, valid_block_rx) = mpsc::channel(); + let event = PrewarmTaskEvent::Terminate { execution_outcome, valid_block_rx }; let _ = tx.send(event); + + Some(valid_block_tx) + } else { + None } } } @@ -724,7 +732,10 @@ impl Drop for CacheTaskHandle { fn drop(&mut self) { // Ensure we always terminate on drop - send None without needing Send + Sync bounds if let Some(tx) = self.to_prewarm_task.take() { - let _ = tx.send(PrewarmTaskEvent::Terminate { execution_outcome: None }); + let _ = tx.send(PrewarmTaskEvent::Terminate { + execution_outcome: None, + valid_block_rx: mpsc::channel().1, + }); } } } diff --git a/crates/engine/tree/src/tree/payload_processor/prewarm.rs b/crates/engine/tree/src/tree/payload_processor/prewarm.rs index 99c689dd19..aecc224965 100644 --- a/crates/engine/tree/src/tree/payload_processor/prewarm.rs +++ b/crates/engine/tree/src/tree/payload_processor/prewarm.rs @@ -261,7 +261,11 @@ where /// /// This method is called from `run()` only after all execution tasks are complete. #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)] - fn save_cache(self, execution_outcome: Arc>) { + fn save_cache( + self, + execution_outcome: Arc>, + valid_block_rx: mpsc::Receiver<()>, + ) { let start = Instant::now(); let Self { execution_cache, ctx: PrewarmContext { env, metrics, saved_cache, .. }, .. } = @@ -288,9 +292,11 @@ where new_cache.update_metrics(); - // Replace the shared cache with the new one; the previous cache (if any) is - // dropped. - *cached = Some(new_cache); + if valid_block_rx.recv().is_ok() { + // Replace the shared cache with the new one; the previous cache (if any) is + // dropped. + *cached = Some(new_cache); + } }); let elapsed = start.elapsed(); @@ -421,9 +427,10 @@ where // completed executing a set of transactions self.send_multi_proof_targets(proof_targets); } - PrewarmTaskEvent::Terminate { execution_outcome } => { + PrewarmTaskEvent::Terminate { execution_outcome, valid_block_rx } => { trace!(target: "engine::tree::payload_processor::prewarm", "Received termination signal"); - final_execution_outcome = Some(execution_outcome); + final_execution_outcome = + Some(execution_outcome.map(|outcome| (outcome, valid_block_rx))); if finished_execution { // all tasks are done, we can exit, which will save caches and exit @@ -448,8 +455,8 @@ where debug!(target: "engine::tree::payload_processor::prewarm", "Completed prewarm execution"); // save caches and finish using the shared ExecutionOutcome - if let Some(Some(execution_outcome)) = final_execution_outcome { - self.save_cache(execution_outcome); + if let Some(Some((execution_outcome, valid_block_rx))) = final_execution_outcome { + self.save_cache(execution_outcome, valid_block_rx); } } } @@ -813,6 +820,11 @@ pub(super) enum PrewarmTaskEvent { /// The final execution outcome. Using `Arc` allows sharing with the main execution /// path without cloning the expensive `BundleState`. execution_outcome: Option>>, + /// Receiver for the block validation result. + /// + /// Cache saving is racing the state root validation. We optimistically construct the + /// updated cache but only save it once we know the block is valid. + valid_block_rx: mpsc::Receiver<()>, }, /// The outcome of a pre-warm task Outcome { diff --git a/crates/engine/tree/src/tree/payload_validator.rs b/crates/engine/tree/src/tree/payload_validator.rs index 9160535df6..6575463f0c 100644 --- a/crates/engine/tree/src/tree/payload_validator.rs +++ b/crates/engine/tree/src/tree/payload_validator.rs @@ -462,6 +462,15 @@ where // After executing the block we can stop prewarming transactions handle.stop_prewarming_execution(); + // Create ExecutionOutcome early so we can terminate caching before validation and state + // root computation. Using Arc allows sharing with both the caching task and the deferred + // trie task without cloning the expensive BundleState. + let output = Arc::new(output); + + // Terminate caching task early since execution is complete and caching is no longer + // needed. This frees up resources while state root computation continues. + let valid_block_tx = handle.terminate_caching(Some(output.clone())); + let block = self.convert_to_block(input)?.with_senders(senders); let hashed_state = ensure_ok_post_block!( @@ -564,16 +573,13 @@ where .into()) } - // Create ExecutionOutcome and wrap in Arc for sharing with both the caching task - // and the deferred trie task. This avoids cloning the expensive BundleState. - let execution_outcome = Arc::new(output); - - // Terminate prewarming task with the shared execution outcome - handle.terminate_caching(Some(Arc::clone(&execution_outcome))); + if let Some(valid_block_tx) = valid_block_tx { + let _ = valid_block_tx.send(()); + } Ok(self.spawn_deferred_trie_task( block, - execution_outcome, + output, &ctx, hashed_state, trie_output,