perf(engine): share Arc<ExecutionOutcome> to avoid cloning BundleState (#20448)

This commit is contained in:
YK
2025-12-18 09:07:18 +08:00
committed by GitHub
parent 432ac7afa1
commit 453514c48f
3 changed files with 85 additions and 56 deletions

View File

@@ -23,11 +23,12 @@ use multiproof::{SparseTrieUpdate, *};
use parking_lot::RwLock;
use prewarm::PrewarmMetrics;
use rayon::prelude::*;
use reth_engine_primitives::ExecutableTxIterator;
use reth_evm::{
execute::{ExecutableTxFor, WithTxEnv},
ConfigureEvm, EvmEnvFor, OnStateHook, SpecFor, TxEnvFor,
ConfigureEvm, EvmEnvFor, ExecutableTxIterator, ExecutableTxTuple, OnStateHook, SpecFor,
TxEnvFor,
};
use reth_execution_types::ExecutionOutcome;
use reth_primitives_traits::NodePrimitives;
use reth_provider::{BlockReader, DatabaseProviderROFactory, StateProviderFactory, StateReader};
use reth_revm::{db::BundleState, state::EvmState};
@@ -92,6 +93,13 @@ pub const SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY: usize = 1_000_000;
/// 144MB.
pub const SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY: usize = 1_000_000;
/// Type alias for [`PayloadHandle`] returned by payload processor spawn methods.
type IteratorPayloadHandle<Evm, I, N> = PayloadHandle<
WithTxEnv<TxEnvFor<Evm>, <I as ExecutableTxTuple>::Tx>,
<I as ExecutableTxTuple>::Error,
<N as NodePrimitives>::Receipt,
>;
/// Entrypoint for executing the payload.
#[derive(Debug)]
pub struct PayloadProcessor<Evm>
@@ -200,7 +208,6 @@ where
///
/// This returns a handle to await the final state root and to interact with the tasks (e.g.
/// canceling)
#[allow(clippy::type_complexity)]
#[instrument(
level = "debug",
target = "engine::tree::payload_processor",
@@ -215,7 +222,7 @@ where
multiproof_provider_factory: F,
config: &TreeConfig,
bal: Option<Arc<BlockAccessList>>,
) -> PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>
) -> IteratorPayloadHandle<Evm, I, N>
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
@@ -320,7 +327,7 @@ where
env: ExecutionEnv<Evm>,
transactions: I,
provider_builder: StateProviderBuilder<N, P>,
) -> PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>
) -> IteratorPayloadHandle<Evm, I, N>
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
{
@@ -400,7 +407,7 @@ where
transaction_count_hint: usize,
provider_builder: StateProviderBuilder<N, P>,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
) -> CacheTaskHandle
) -> CacheTaskHandle<N::Receipt>
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
{
@@ -581,12 +588,15 @@ where
}
/// Handle to all the spawned tasks.
///
/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the
/// caching task without cloning the expensive `BundleState`.
#[derive(Debug)]
pub struct PayloadHandle<Tx, Err> {
pub struct PayloadHandle<Tx, Err, R> {
/// Channel for evm state updates
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
// must include the receiver of the state root wired to the sparse trie
prewarm_handle: CacheTaskHandle,
prewarm_handle: CacheTaskHandle<R>,
/// Stream of block transactions
transactions: mpsc::Receiver<Result<Tx, Err>>,
/// Receiver for the state root
@@ -595,7 +605,7 @@ pub struct PayloadHandle<Tx, Err> {
_span: Span,
}
impl<Tx, Err> PayloadHandle<Tx, Err> {
impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
/// Awaits the state root
///
/// # Panics
@@ -648,9 +658,14 @@ impl<Tx, Err> PayloadHandle<Tx, Err> {
/// Terminates the entire caching task.
///
/// If the [`BundleState`] is provided it will update the shared cache.
pub(super) fn terminate_caching(&mut self, block_output: Option<&BundleState>) {
self.prewarm_handle.terminate_caching(block_output)
/// If the [`ExecutionOutcome`] is provided it will update the shared cache using its
/// bundle state. Using `Arc<ExecutionOutcome>` allows sharing with the main execution
/// path without cloning the expensive `BundleState`.
pub(super) fn terminate_caching(
&mut self,
execution_outcome: Option<Arc<ExecutionOutcome<R>>>,
) {
self.prewarm_handle.terminate_caching(execution_outcome)
}
/// Returns iterator yielding transactions from the stream.
@@ -662,17 +677,20 @@ impl<Tx, Err> PayloadHandle<Tx, Err> {
}
/// Access to the spawned [`PrewarmCacheTask`].
///
/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the
/// prewarm task without cloning the expensive `BundleState`.
#[derive(Debug)]
pub(crate) struct CacheTaskHandle {
pub(crate) struct CacheTaskHandle<R> {
/// The shared cache the task operates with.
cache: Option<StateExecutionCache>,
/// Metrics for the caches
cache_metrics: Option<CachedStateMetrics>,
/// Channel to the spawned prewarm task if any
to_prewarm_task: Option<std::sync::mpsc::Sender<PrewarmTaskEvent>>,
to_prewarm_task: Option<std::sync::mpsc::Sender<PrewarmTaskEvent<R>>>,
}
impl CacheTaskHandle {
impl<R: Send + Sync + 'static> CacheTaskHandle<R> {
/// Terminates the pre-warming transaction processing.
///
/// Note: This does not terminate the task yet.
@@ -684,20 +702,25 @@ impl CacheTaskHandle {
/// Terminates the entire pre-warming task.
///
/// If the [`BundleState`] is provided it will update the shared cache.
pub(super) fn terminate_caching(&mut self, block_output: Option<&BundleState>) {
/// If the [`ExecutionOutcome`] is provided it will update the shared cache using its
/// bundle state. Using `Arc<ExecutionOutcome>` avoids cloning the expensive `BundleState`.
pub(super) fn terminate_caching(
&mut self,
execution_outcome: Option<Arc<ExecutionOutcome<R>>>,
) {
if let Some(tx) = self.to_prewarm_task.take() {
// Only clone when we have an active task and a state to send
let event = PrewarmTaskEvent::Terminate { block_output: block_output.cloned() };
let event = PrewarmTaskEvent::Terminate { execution_outcome };
let _ = tx.send(event);
}
}
}
impl Drop for CacheTaskHandle {
impl<R> Drop for CacheTaskHandle<R> {
fn drop(&mut self) {
// Ensure we always terminate on drop
self.terminate_caching(None);
// Ensure we always terminate on drop - send None without needing Send + Sync bounds
if let Some(tx) = self.to_prewarm_task.take() {
let _ = tx.send(PrewarmTaskEvent::Terminate { execution_outcome: None });
}
}
}

View File

@@ -27,10 +27,11 @@ use alloy_primitives::{keccak256, map::B256Set, B256};
use crossbeam_channel::Sender as CrossbeamSender;
use metrics::{Counter, Gauge, Histogram};
use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, SpecFor};
use reth_execution_types::ExecutionOutcome;
use reth_metrics::Metrics;
use reth_primitives_traits::NodePrimitives;
use reth_provider::{BlockReader, StateProviderFactory, StateReader};
use reth_revm::{database::StateProviderDatabase, db::BundleState, state::EvmState};
use reth_revm::{database::StateProviderDatabase, state::EvmState};
use reth_trie::MultiProofTargets;
use std::{
sync::{
@@ -86,7 +87,7 @@ where
/// Sender to emit evm state outcome messages, if any.
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
/// Receiver for events produced by tx execution
actions_rx: Receiver<PrewarmTaskEvent>,
actions_rx: Receiver<PrewarmTaskEvent<N::Receipt>>,
/// Parent span for tracing
parent_span: Span,
}
@@ -105,7 +106,7 @@ where
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
transaction_count_hint: usize,
max_concurrency: usize,
) -> (Self, Sender<PrewarmTaskEvent>) {
) -> (Self, Sender<PrewarmTaskEvent<N::Receipt>>) {
let (actions_tx, actions_rx) = channel();
trace!(
@@ -135,8 +136,11 @@ where
/// For Optimism chains, special handling is applied to the first transaction if it's a
/// deposit transaction (type 0x7E/126) which sets critical metadata that affects all
/// subsequent transactions in the block.
fn spawn_all<Tx>(&self, pending: mpsc::Receiver<Tx>, actions_tx: Sender<PrewarmTaskEvent>)
where
fn spawn_all<Tx>(
&self,
pending: mpsc::Receiver<Tx>,
actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
) where
Tx: ExecutableTxFor<Evm> + Clone + Send + 'static,
{
let executor = self.executor.clone();
@@ -248,7 +252,7 @@ where
///
/// This method is called from `run()` only after all execution tasks are complete.
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
fn save_cache(self, state: BundleState) {
fn save_cache(self, execution_outcome: Arc<ExecutionOutcome<N::Receipt>>) {
let start = Instant::now();
let Self { execution_cache, ctx: PrewarmContext { env, metrics, saved_cache, .. }, .. } =
@@ -265,7 +269,8 @@ where
let new_cache = SavedCache::new(hash, caches, cache_metrics);
// Insert state into cache while holding the lock
if new_cache.cache().insert_state(&state).is_err() {
// Access the BundleState through the shared ExecutionOutcome
if new_cache.cache().insert_state(execution_outcome.state()).is_err() {
// Clear the cache on error to prevent having a polluted cache
*cached = None;
debug!(target: "engine::caching", "cleared execution cache on update error");
@@ -300,12 +305,12 @@ where
pub(super) fn run(
self,
pending: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
actions_tx: Sender<PrewarmTaskEvent>,
actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
) {
// spawn execution tasks.
self.spawn_all(pending, actions_tx);
let mut final_block_output = None;
let mut final_execution_outcome = None;
let mut finished_execution = false;
while let Ok(event) = self.actions_rx.recv() {
match event {
@@ -318,9 +323,9 @@ where
// completed executing a set of transactions
self.send_multi_proof_targets(proof_targets);
}
PrewarmTaskEvent::Terminate { block_output } => {
PrewarmTaskEvent::Terminate { execution_outcome } => {
trace!(target: "engine::tree::payload_processor::prewarm", "Received termination signal");
final_block_output = Some(block_output);
final_execution_outcome = Some(execution_outcome);
if finished_execution {
// all tasks are done, we can exit, which will save caches and exit
@@ -334,7 +339,7 @@ where
finished_execution = true;
if final_block_output.is_some() {
if final_execution_outcome.is_some() {
// all tasks are done, we can exit, which will save caches and exit
break
}
@@ -344,9 +349,9 @@ where
debug!(target: "engine::tree::payload_processor::prewarm", "Completed prewarm execution");
// save caches and finish
if let Some(Some(state)) = final_block_output {
self.save_cache(state);
// save caches and finish using the shared ExecutionOutcome
if let Some(Some(execution_outcome)) = final_execution_outcome {
self.save_cache(execution_outcome);
}
}
}
@@ -454,7 +459,7 @@ where
fn transact_batch<Tx>(
self,
txs: mpsc::Receiver<IndexedTransaction<Tx>>,
sender: Sender<PrewarmTaskEvent>,
sender: Sender<PrewarmTaskEvent<N::Receipt>>,
done_tx: Sender<()>,
) where
Tx: ExecutableTxFor<Evm>,
@@ -535,7 +540,7 @@ where
&self,
idx: usize,
executor: &WorkloadExecutor,
actions_tx: Sender<PrewarmTaskEvent>,
actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
done_tx: Sender<()>,
) -> mpsc::Sender<IndexedTransaction<Tx>>
where
@@ -591,14 +596,18 @@ fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargets, usize)
}
/// The events the pre-warm task can handle.
pub(super) enum PrewarmTaskEvent {
///
/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the main
/// execution path without cloning the expensive `BundleState`.
pub(super) enum PrewarmTaskEvent<R> {
/// Forcefully terminate all remaining transaction execution.
TerminateTransactionExecution,
/// Forcefully terminate the task on demand and update the shared cache with the given output
/// before exiting.
Terminate {
/// The final block state output.
block_output: Option<BundleState>,
/// The final execution outcome. Using `Arc` allows sharing with the main execution
/// path without cloning the expensive `BundleState`.
execution_outcome: Option<Arc<ExecutionOutcome<R>>>,
},
/// The outcome of a pre-warm task
Outcome {

View File

@@ -549,17 +549,14 @@ where
.into())
}
// terminate prewarming task with good state output
handle.terminate_caching(Some(&output.state));
// Create ExecutionOutcome and wrap in Arc for sharing with both the caching task
// and the deferred trie task. This avoids cloning the expensive BundleState.
let execution_outcome = Arc::new(ExecutionOutcome::from((output, block_num_hash.number)));
Ok(self.spawn_deferred_trie_task(
block,
output,
block_num_hash.number,
&ctx,
hashed_state,
trie_output,
))
// Terminate prewarming task with the shared execution outcome
handle.terminate_caching(Some(Arc::clone(&execution_outcome)));
Ok(self.spawn_deferred_trie_task(block, execution_outcome, &ctx, hashed_state, trie_output))
}
/// Return sealed block header from database or in-memory state by hash.
@@ -602,7 +599,7 @@ where
state_provider: S,
env: ExecutionEnv<Evm>,
input: &BlockOrPayload<T>,
handle: &mut PayloadHandle<impl ExecutableTxFor<Evm>, Err>,
handle: &mut PayloadHandle<impl ExecutableTxFor<Evm>, Err, N::Receipt>,
) -> Result<(BlockExecutionOutput<N::Receipt>, Vec<Address>), InsertBlockErrorKind>
where
S: StateProvider,
@@ -790,6 +787,7 @@ where
PayloadHandle<
impl ExecutableTxFor<Evm> + use<N, P, Evm, V, T>,
impl core::error::Error + Send + Sync + 'static + use<N, P, Evm, V, T>,
N::Receipt,
>,
InsertBlockErrorKind,
> {
@@ -1024,8 +1022,7 @@ where
fn spawn_deferred_trie_task(
&self,
block: RecoveredBlock<N::Block>,
output: BlockExecutionOutput<N::Receipt>,
block_number: u64,
execution_outcome: Arc<ExecutionOutcome<N::Receipt>>,
ctx: &TreeCtx<'_, N>,
hashed_state: HashedPostState,
trie_output: TrieUpdates,
@@ -1075,7 +1072,7 @@ where
ExecutedBlock::with_deferred_trie_data(
Arc::new(block),
Arc::new(ExecutionOutcome::from((output, block_number))),
execution_outcome,
deferred_trie_data,
)
}