From dca5852213f65e85ea66d57c043f184381857f9e Mon Sep 17 00:00:00 2001 From: Arsenii Kulikov Date: Fri, 27 Feb 2026 22:39:49 +0400 Subject: [PATCH] perf: share executed tx counter with prewarming (#22647) Co-authored-by: Gancer --- .../tree/src/tree/payload_processor/mod.rs | 18 ++++++++++++++++-- .../tree/src/tree/payload_processor/prewarm.rs | 16 +++++++++++++++- .../engine/tree/src/tree/payload_validator.rs | 12 +++++++++++- 3 files changed, 42 insertions(+), 4 deletions(-) diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index f41f3ba86b..a09f1269dd 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -44,7 +44,7 @@ use reth_trie_sparse::{ use std::{ ops::Not, sync::{ - atomic::AtomicBool, + atomic::{AtomicBool, AtomicUsize}, mpsc::{self, channel}, Arc, }, @@ -485,6 +485,8 @@ where let saved_cache = self.disable_state_cache.not().then(|| self.cache_for(env.parent_hash)); + let executed_tx_index = Arc::new(AtomicUsize::new(0)); + // configure prewarming let prewarm_ctx = PrewarmContext { env, @@ -493,6 +495,7 @@ where provider: provider_builder, metrics: PrewarmMetrics::default(), terminate_execution: Arc::new(AtomicBool::new(false)), + executed_tx_index: Arc::clone(&executed_tx_index), precompile_cache_disabled: self.precompile_cache_disabled, precompile_cache_map: self.precompile_cache_map.clone(), }; @@ -518,7 +521,7 @@ where }); } - CacheTaskHandle { saved_cache, to_prewarm_task: Some(to_prewarm_task) } + CacheTaskHandle { saved_cache, to_prewarm_task: Some(to_prewarm_task), executed_tx_index } } /// Returns the cache for the given parent hash. @@ -867,6 +870,14 @@ impl PayloadHandle { self.prewarm_handle.saved_cache.as_ref().map(|cache| cache.metrics().clone()) } + /// Returns a reference to the shared executed transaction index counter. + /// + /// The main execution loop should store `index + 1` after executing each transaction so that + /// prewarm workers can skip transactions that have already been processed. + pub const fn executed_tx_index(&self) -> &Arc { + &self.prewarm_handle.executed_tx_index + } + /// Terminates the pre-warming transaction processing. /// /// Note: This does not terminate the task yet. @@ -904,6 +915,9 @@ pub struct CacheTaskHandle { saved_cache: Option, /// Channel to the spawned prewarm task if any to_prewarm_task: Option>>, + /// Shared counter tracking the next transaction index to be executed by the main execution + /// loop. Prewarm workers skip transactions below this index. + executed_tx_index: Arc, } impl CacheTaskHandle { diff --git a/crates/engine/tree/src/tree/payload_processor/prewarm.rs b/crates/engine/tree/src/tree/payload_processor/prewarm.rs index dca6118d87..50f93d7fdc 100644 --- a/crates/engine/tree/src/tree/payload_processor/prewarm.rs +++ b/crates/engine/tree/src/tree/payload_processor/prewarm.rs @@ -35,7 +35,7 @@ use reth_revm::{database::StateProviderDatabase, state::EvmState}; use reth_tasks::{pool::WorkerPool, Runtime}; use reth_trie_common::{MultiProofTargetsV2, ProofV2Target}; use std::sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicUsize, Ordering}, mpsc::{self, channel, Receiver, Sender}, Arc, }; @@ -157,6 +157,11 @@ where break; } + // skip transactions already executed by the main loop + if index < ctx.executed_tx_index.load(Ordering::Relaxed) { + continue; + } + tx_count += 1; let parent_span = Span::current(); s.spawn(move |_| { @@ -212,6 +217,11 @@ where return; } + // skip if main execution has already processed this transaction + if index < ctx.executed_tx_index.load(Ordering::Relaxed) { + return; + } + let start = Instant::now(); let (tx_env, tx) = tx.into_parts(); @@ -497,6 +507,10 @@ where pub metrics: PrewarmMetrics, /// An atomic bool that tells prewarm tasks to not start any more execution. pub terminate_execution: Arc, + /// Shared counter tracking the next transaction index to be executed by the main execution + /// loop. Prewarm workers skip transactions with `index < counter` since those have already + /// been executed. + pub executed_tx_index: Arc, /// Whether the precompile cache is disabled. pub precompile_cache_disabled: bool, /// The precompile cache map. diff --git a/crates/engine/tree/src/tree/payload_validator.rs b/crates/engine/tree/src/tree/payload_validator.rs index b7fc27b241..65346ace57 100644 --- a/crates/engine/tree/src/tree/payload_validator.rs +++ b/crates/engine/tree/src/tree/payload_validator.rs @@ -50,7 +50,11 @@ use revm_primitives::Address; use std::{ collections::HashMap, panic::{self, AssertUnwindSafe}, - sync::{mpsc::RecvTimeoutError, Arc}, + sync::{ + atomic::{AtomicUsize, Ordering}, + mpsc::RecvTimeoutError, + Arc, + }, }; use tracing::{debug, debug_span, error, info, instrument, trace, warn, Span}; @@ -866,6 +870,7 @@ where .spawn_blocking_named("receipt-root", move || task_handle.run(receipts_len)); let transaction_count = input.transaction_count(); + let executed_tx_index = Arc::clone(handle.executed_tx_index()); let executor = executor.with_state_hook( handle.state_hook().map(|hook| Box::new(hook) as Box), ); @@ -878,6 +883,7 @@ where transaction_count, handle.iter_transactions(), &receipt_tx, + &executed_tx_index, )?; drop(receipt_tx); @@ -917,6 +923,7 @@ where transaction_count: usize, transactions: impl Iterator>, receipt_tx: &crossbeam_channel::Sender>, + executed_tx_index: &AtomicUsize, ) -> Result<(E, Vec
), BlockExecutionError> where E: BlockExecutor, @@ -963,6 +970,9 @@ where executor.execute_transaction(tx)?; self.metrics.record_transaction_execution(tx_start.elapsed()); + // advance the shared counter so prewarm workers skip already-executed txs + executed_tx_index.store(senders.len(), Ordering::Relaxed); + let current_len = executor.receipts().len(); if current_len > last_sent_len { last_sent_len = current_len;