perf: share executed tx counter with prewarming (#22647)

Co-authored-by: Gancer <gancer16@gmail.com>
This commit is contained in:
Arsenii Kulikov
2026-02-27 22:39:49 +04:00
committed by GitHub
parent c94b728af1
commit dca5852213
3 changed files with 42 additions and 4 deletions

View File

@@ -44,7 +44,7 @@ use reth_trie_sparse::{
use std::{
ops::Not,
sync::{
atomic::AtomicBool,
atomic::{AtomicBool, AtomicUsize},
mpsc::{self, channel},
Arc,
},
@@ -485,6 +485,8 @@ where
let saved_cache = self.disable_state_cache.not().then(|| self.cache_for(env.parent_hash));
let executed_tx_index = Arc::new(AtomicUsize::new(0));
// configure prewarming
let prewarm_ctx = PrewarmContext {
env,
@@ -493,6 +495,7 @@ where
provider: provider_builder,
metrics: PrewarmMetrics::default(),
terminate_execution: Arc::new(AtomicBool::new(false)),
executed_tx_index: Arc::clone(&executed_tx_index),
precompile_cache_disabled: self.precompile_cache_disabled,
precompile_cache_map: self.precompile_cache_map.clone(),
};
@@ -518,7 +521,7 @@ where
});
}
CacheTaskHandle { saved_cache, to_prewarm_task: Some(to_prewarm_task) }
CacheTaskHandle { saved_cache, to_prewarm_task: Some(to_prewarm_task), executed_tx_index }
}
/// Returns the cache for the given parent hash.
@@ -867,6 +870,14 @@ impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
self.prewarm_handle.saved_cache.as_ref().map(|cache| cache.metrics().clone())
}
/// Returns a reference to the shared executed transaction index counter.
///
/// The main execution loop should store `index + 1` after executing each transaction so that
/// prewarm workers can skip transactions that have already been processed.
pub const fn executed_tx_index(&self) -> &Arc<AtomicUsize> {
&self.prewarm_handle.executed_tx_index
}
/// Terminates the pre-warming transaction processing.
///
/// Note: This does not terminate the task yet.
@@ -904,6 +915,9 @@ pub struct CacheTaskHandle<R> {
saved_cache: Option<SavedCache>,
/// Channel to the spawned prewarm task if any
to_prewarm_task: Option<std::sync::mpsc::Sender<PrewarmTaskEvent<R>>>,
/// Shared counter tracking the next transaction index to be executed by the main execution
/// loop. Prewarm workers skip transactions below this index.
executed_tx_index: Arc<AtomicUsize>,
}
impl<R: Send + Sync + 'static> CacheTaskHandle<R> {

View File

@@ -35,7 +35,7 @@ use reth_revm::{database::StateProviderDatabase, state::EvmState};
use reth_tasks::{pool::WorkerPool, Runtime};
use reth_trie_common::{MultiProofTargetsV2, ProofV2Target};
use std::sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicUsize, Ordering},
mpsc::{self, channel, Receiver, Sender},
Arc,
};
@@ -157,6 +157,11 @@ where
break;
}
// skip transactions already executed by the main loop
if index < ctx.executed_tx_index.load(Ordering::Relaxed) {
continue;
}
tx_count += 1;
let parent_span = Span::current();
s.spawn(move |_| {
@@ -212,6 +217,11 @@ where
return;
}
// skip if main execution has already processed this transaction
if index < ctx.executed_tx_index.load(Ordering::Relaxed) {
return;
}
let start = Instant::now();
let (tx_env, tx) = tx.into_parts();
@@ -497,6 +507,10 @@ where
pub metrics: PrewarmMetrics,
/// An atomic bool that tells prewarm tasks to not start any more execution.
pub terminate_execution: Arc<AtomicBool>,
/// Shared counter tracking the next transaction index to be executed by the main execution
/// loop. Prewarm workers skip transactions with `index < counter` since those have already
/// been executed.
pub executed_tx_index: Arc<AtomicUsize>,
/// Whether the precompile cache is disabled.
pub precompile_cache_disabled: bool,
/// The precompile cache map.

View File

@@ -50,7 +50,11 @@ use revm_primitives::Address;
use std::{
collections::HashMap,
panic::{self, AssertUnwindSafe},
sync::{mpsc::RecvTimeoutError, Arc},
sync::{
atomic::{AtomicUsize, Ordering},
mpsc::RecvTimeoutError,
Arc,
},
};
use tracing::{debug, debug_span, error, info, instrument, trace, warn, Span};
@@ -866,6 +870,7 @@ where
.spawn_blocking_named("receipt-root", move || task_handle.run(receipts_len));
let transaction_count = input.transaction_count();
let executed_tx_index = Arc::clone(handle.executed_tx_index());
let executor = executor.with_state_hook(
handle.state_hook().map(|hook| Box::new(hook) as Box<dyn OnStateHook>),
);
@@ -878,6 +883,7 @@ where
transaction_count,
handle.iter_transactions(),
&receipt_tx,
&executed_tx_index,
)?;
drop(receipt_tx);
@@ -917,6 +923,7 @@ where
transaction_count: usize,
transactions: impl Iterator<Item = Result<Tx, Err>>,
receipt_tx: &crossbeam_channel::Sender<IndexedReceipt<N::Receipt>>,
executed_tx_index: &AtomicUsize,
) -> Result<(E, Vec<Address>), BlockExecutionError>
where
E: BlockExecutor<Receipt = N::Receipt>,
@@ -963,6 +970,9 @@ where
executor.execute_transaction(tx)?;
self.metrics.record_transaction_execution(tx_start.elapsed());
// advance the shared counter so prewarm workers skip already-executed txs
executed_tx_index.store(senders.len(), Ordering::Relaxed);
let current_len = executor.receipts().len();
if current_len > last_sent_len {
last_sent_len = current_len;