diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index 8558a7479e..af9a2548ce 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -31,6 +31,7 @@ use reth_trie_parallel::{ use std::{ collections::VecDeque, sync::{ + atomic::AtomicBool, mpsc, mpsc::{channel, Sender}, Arc, @@ -251,6 +252,7 @@ where cache_metrics: cache_metrics.clone(), provider: provider_builder, metrics: PrewarmMetrics::default(), + terminate_execution: Arc::new(AtomicBool::new(false)), }; let prewarm_task = PrewarmCacheTask::new( diff --git a/crates/engine/tree/src/tree/payload_processor/prewarm.rs b/crates/engine/tree/src/tree/payload_processor/prewarm.rs index 07502141a0..4d94f7b84c 100644 --- a/crates/engine/tree/src/tree/payload_processor/prewarm.rs +++ b/crates/engine/tree/src/tree/payload_processor/prewarm.rs @@ -20,7 +20,11 @@ use reth_revm::{database::StateProviderDatabase, db::BundleState, state::EvmStat use reth_trie::MultiProofTargets; use std::{ collections::VecDeque, - sync::mpsc::{channel, Receiver, Sender}, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::{channel, Receiver, Sender}, + Arc, + }, time::Instant, }; use tracing::{debug, trace}; @@ -46,6 +50,8 @@ pub(super) struct PrewarmCacheTask { actions_rx: Receiver, /// Sender the transactions use to send their result back actions_tx: Sender, + /// Total prewarming tasks spawned + prewarm_outcomes_left: usize, } impl PrewarmCacheTask @@ -72,6 +78,7 @@ where to_multi_proof, actions_rx, actions_tx, + prewarm_outcomes_left: 0, } } @@ -89,6 +96,7 @@ where let ctx = self.ctx.clone(); let pending_chunk = chunk.collect::>(); + self.prewarm_outcomes_left += pending_chunk.len(); self.executor.spawn_blocking(move || { ctx.transact_batch(&pending_chunk, sender); }); @@ -103,7 +111,7 @@ where } /// Save the state to the shared cache for the given block. - fn save_cache(&self, state: BundleState) { + fn save_cache(self, state: BundleState) { let start = Instant::now(); let cache = SavedCache::new( self.ctx.header.hash(), @@ -123,6 +131,15 @@ where self.ctx.metrics.cache_saving_duration.set(start.elapsed().as_secs_f64()); } + /// Removes the `actions_tx` currently stored in the struct, replacing it with a new one that + /// does not point to any active receiver. + /// + /// This is used to drop the `actions_tx` after all tasks have been spawned, and should not be + /// used in any context other than the `run` method. + fn drop_actions_tx(&mut self) { + self.actions_tx = channel().0; + } + /// Executes the task. /// /// This will execute the transactions until all transactions have been processed or the task @@ -134,26 +151,45 @@ where // spawn execution tasks. self.spawn_all(); + // drop the actions sender after we've spawned all execution tasks. This is so that the + // following loop can terminate even if one of the prewarm tasks ends in an error (i.e., + // does not return an Outcome) or panics. + self.drop_actions_tx(); + + let mut final_block_output = None; while let Ok(event) = self.actions_rx.recv() { match event { PrewarmTaskEvent::TerminateTransactionExecution => { // stop tx processing - self.pending.clear(); + self.ctx.terminate_execution.store(true, Ordering::Relaxed); } PrewarmTaskEvent::Outcome { proof_targets } => { - // completed a transaction, frees up one slot + // completed executing a set of transactions self.send_multi_proof_targets(proof_targets); + + // decrement the number of tasks left + self.prewarm_outcomes_left -= 1; + + if self.prewarm_outcomes_left == 0 && final_block_output.is_some() { + // all tasks are done, and we have the block output, we can exit + break + } } PrewarmTaskEvent::Terminate { block_output } => { - // terminate the task - if let Some(state) = block_output { - self.save_cache(state); - } + final_block_output = Some(block_output); - break + if self.prewarm_outcomes_left == 0 { + // all tasks are done, we can exit, which will save caches and exit + break + } } } } + + // save caches and finish + if let Some(Some(state)) = final_block_output { + self.save_cache(state); + } } } @@ -167,6 +203,8 @@ pub(super) struct PrewarmContext { /// Provider to obtain the state pub(super) provider: StateProviderBuilder, pub(super) metrics: PrewarmMetrics, + /// An atomic bool that tells prewarm tasks to not start any more execution. + pub(super) terminate_execution: Arc, } impl PrewarmContext @@ -175,9 +213,20 @@ where P: BlockReader + StateProviderFactory + StateReader + StateCommitmentProvider + Clone + 'static, Evm: ConfigureEvm + 'static, { - /// Splits this context into an evm, an evm config, and metrics. - fn evm_for_ctx(self) -> Option<(EvmFor, Evm, PrewarmMetrics)> { - let Self { header, evm_config, cache: caches, cache_metrics, provider, metrics } = self; + /// Splits this context into an evm, an evm config, metrics, and the atomic bool for terminating + /// execution. + fn evm_for_ctx( + self, + ) -> Option<(EvmFor, Evm, PrewarmMetrics, Arc)> { + let Self { + header, + evm_config, + cache: caches, + cache_metrics, + provider, + metrics, + terminate_execution, + } = self; let state_provider = match provider.build() { Ok(provider) => provider, @@ -206,7 +255,7 @@ where // create a new executor and disable nonce checks in the env let evm = evm_config.evm_with_env(state_provider, evm_env); - Some((evm, evm_config, metrics)) + Some((evm, evm_config, metrics, terminate_execution)) } /// Transacts the vec of transactions and returns the state outcome. @@ -217,9 +266,18 @@ where /// Note: Since here are no ordering guarantees this won't the state the txs produce when /// executed sequentially. fn transact_batch(self, txs: &[Recovered], sender: Sender) { - let Some((mut evm, evm_config, metrics)) = self.evm_for_ctx() else { return }; + let Some((mut evm, evm_config, metrics, terminate_execution)) = self.evm_for_ctx() else { + return + }; for tx in txs { + // If the task was cancelled, stop execution, send an empty result to notify the task, + // and exit. + if terminate_execution.load(Ordering::Relaxed) { + let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: None }); + return + } + // create the tx env let tx_env = evm_config.tx_env(tx); let start = Instant::now();