refactor(engine): move execution logic from metrics to payload_validator (#21226)

This commit is contained in:
Sergei Shulepov
2026-01-21 11:17:30 +00:00
committed by GitHub
parent 5a5c21cc1b
commit 43a84f1231
2 changed files with 168 additions and 340 deletions

View File

@@ -1,25 +1,15 @@
use crate::tree::{error::InsertBlockFatalError, MeteredStateHook, TreeOutcome};
use alloy_consensus::transaction::TxHashRef;
use alloy_evm::{
block::{BlockExecutor, ExecutableTx},
Evm,
};
use crate::tree::{error::InsertBlockFatalError, TreeOutcome};
use alloy_rpc_types_engine::{PayloadStatus, PayloadStatusEnum};
use core::borrow::BorrowMut;
use reth_engine_primitives::{ForkchoiceStatus, OnForkChoiceUpdated};
use reth_errors::{BlockExecutionError, ProviderError};
use reth_evm::{metrics::ExecutorMetrics, OnStateHook};
use reth_errors::ProviderError;
use reth_evm::metrics::ExecutorMetrics;
use reth_execution_types::BlockExecutionOutput;
use reth_metrics::{
metrics::{Counter, Gauge, Histogram},
Metrics,
};
use reth_primitives_traits::SignedTransaction;
use reth_trie::updates::TrieUpdates;
use revm::database::{states::bundle_state::BundleRetention, State};
use revm_primitives::Address;
use std::time::Instant;
use tracing::{debug_span, trace};
use std::time::{Duration, Instant};
/// Metrics for the `EngineApi`.
#[derive(Debug, Default)]
@@ -35,114 +25,24 @@ pub(crate) struct EngineApiMetrics {
}
impl EngineApiMetrics {
/// Helper function for metered execution
fn metered<F, R>(&self, f: F) -> R
where
F: FnOnce() -> (u64, R),
{
// Execute the block and record the elapsed time.
let execute_start = Instant::now();
let (gas_used, output) = f();
let execution_duration = execute_start.elapsed().as_secs_f64();
// Update gas metrics.
self.executor.gas_processed_total.increment(gas_used);
self.executor.gas_per_second.set(gas_used as f64 / execution_duration);
self.executor.gas_used_histogram.record(gas_used as f64);
self.executor.execution_histogram.record(execution_duration);
self.executor.execution_duration.set(execution_duration);
output
}
/// Execute the given block using the provided [`BlockExecutor`] and update metrics for the
/// execution.
/// Records metrics for block execution.
///
/// This method updates metrics for execution time, gas usage, and the number
/// of accounts, storage slots and bytecodes loaded and updated.
///
/// The optional `on_receipt` callback is invoked after each transaction with the receipt
/// index and a reference to all receipts collected so far. This allows callers to stream
/// receipts to a background task for incremental receipt root computation.
pub(crate) fn execute_metered<E, DB, F>(
/// of accounts, storage slots and bytecodes updated.
pub(crate) fn record_block_execution<R>(
&self,
executor: E,
mut transactions: impl Iterator<Item = Result<impl ExecutableTx<E>, BlockExecutionError>>,
transaction_count: usize,
state_hook: Box<dyn OnStateHook>,
mut on_receipt: F,
) -> Result<(BlockExecutionOutput<E::Receipt>, Vec<Address>), BlockExecutionError>
where
DB: alloy_evm::Database,
E: BlockExecutor<Evm: Evm<DB: BorrowMut<State<DB>>>, Transaction: SignedTransaction>,
F: FnMut(&[E::Receipt]),
{
// clone here is cheap, all the metrics are Option<Arc<_>>. additionally
// they are globally registered so that the data recorded in the hook will
// be accessible.
let wrapper = MeteredStateHook { metrics: self.executor.clone(), inner_hook: state_hook };
output: &BlockExecutionOutput<R>,
execution_duration: Duration,
) {
let execution_secs = execution_duration.as_secs_f64();
let gas_used = output.result.gas_used;
let mut senders = Vec::with_capacity(transaction_count);
let mut executor = executor.with_state_hook(Some(Box::new(wrapper)));
let f = || {
let start = Instant::now();
debug_span!(target: "engine::tree", "pre execution")
.entered()
.in_scope(|| executor.apply_pre_execution_changes())?;
self.executor.pre_execution_histogram.record(start.elapsed());
let exec_span = debug_span!(target: "engine::tree", "execution").entered();
loop {
let start = Instant::now();
let Some(tx) = transactions.next() else { break };
self.executor.transaction_wait_histogram.record(start.elapsed());
let tx = tx?;
senders.push(*tx.signer());
let span = debug_span!(
target: "engine::tree",
"execute tx",
tx_hash = ?tx.tx().tx_hash(),
gas_used = tracing::field::Empty,
);
let enter = span.entered();
trace!(target: "engine::tree", "Executing transaction");
let start = Instant::now();
let gas_used = executor.execute_transaction(tx)?;
self.executor.transaction_execution_histogram.record(start.elapsed());
// Invoke callback with the latest receipt
on_receipt(executor.receipts());
// record the tx gas used
enter.record("gas_used", gas_used);
}
drop(exec_span);
let start = Instant::now();
let result = debug_span!(target: "engine::tree", "finish")
.entered()
.in_scope(|| executor.finish())
.map(|(evm, result)| (evm.into_db(), result));
self.executor.post_execution_histogram.record(start.elapsed());
result
};
// Use metered to execute and track timing/gas metrics
let (mut db, result) = self.metered(|| {
let res = f();
let gas_used = res.as_ref().map(|r| r.1.gas_used).unwrap_or(0);
(gas_used, res)
})?;
// merge transitions into bundle state
debug_span!(target: "engine::tree", "merge transitions")
.entered()
.in_scope(|| db.borrow_mut().merge_transitions(BundleRetention::Reverts));
let output = BlockExecutionOutput { result, state: db.borrow_mut().take_bundle() };
// Update gas metrics
self.executor.gas_processed_total.increment(gas_used);
self.executor.gas_per_second.set(gas_used as f64 / execution_secs);
self.executor.gas_used_histogram.record(gas_used as f64);
self.executor.execution_histogram.record(execution_secs);
self.executor.execution_duration.set(execution_secs);
// Update the metrics for the number of accounts, storage slots and bytecodes updated
let accounts = output.state.state.len();
@@ -153,8 +53,31 @@ impl EngineApiMetrics {
self.executor.accounts_updated_histogram.record(accounts as f64);
self.executor.storage_slots_updated_histogram.record(storage_slots as f64);
self.executor.bytecodes_updated_histogram.record(bytecodes as f64);
}
Ok((output, senders))
/// Returns a reference to the executor metrics for use in state hooks.
pub(crate) const fn executor_metrics(&self) -> &ExecutorMetrics {
&self.executor
}
/// Records the duration of block pre-execution changes (e.g., beacon root update).
pub(crate) fn record_pre_execution(&self, elapsed: Duration) {
self.executor.pre_execution_histogram.record(elapsed);
}
/// Records the duration of block post-execution changes (e.g., finalization).
pub(crate) fn record_post_execution(&self, elapsed: Duration) {
self.executor.post_execution_histogram.record(elapsed);
}
/// Records the time spent waiting for the next transaction from the iterator.
pub(crate) fn record_transaction_wait(&self, elapsed: Duration) {
self.executor.transaction_wait_histogram.record(elapsed);
}
/// Records the duration of a single transaction execution.
pub(crate) fn record_transaction_execution(&self, elapsed: Duration) {
self.executor.transaction_execution_histogram.record(elapsed);
}
}
@@ -433,138 +356,10 @@ pub(crate) struct BlockBufferMetrics {
mod tests {
use super::*;
use alloy_eips::eip7685::Requests;
use alloy_evm::block::StateChangeSource;
use alloy_primitives::{B256, U256};
use metrics_util::debugging::{DebuggingRecorder, Snapshotter};
use reth_ethereum_primitives::{Receipt, TransactionSigned};
use reth_evm_ethereum::EthEvm;
use reth_ethereum_primitives::Receipt;
use reth_execution_types::BlockExecutionResult;
use reth_primitives_traits::RecoveredBlock;
use revm::{
context::result::{ExecutionResult, Output, ResultAndState, SuccessReason},
database::State,
database_interface::EmptyDB,
inspector::NoOpInspector,
state::{Account, AccountInfo, AccountStatus, EvmState, EvmStorage, EvmStorageSlot},
Context, MainBuilder, MainContext,
};
use revm_primitives::Bytes;
use std::sync::mpsc;
/// A simple mock executor for testing that doesn't require complex EVM setup
struct MockExecutor {
state: EvmState,
receipts: Vec<Receipt>,
hook: Option<Box<dyn OnStateHook>>,
}
impl MockExecutor {
fn new(state: EvmState) -> Self {
Self { state, receipts: vec![], hook: None }
}
}
// Mock Evm type for testing
type MockEvm = EthEvm<State<EmptyDB>, NoOpInspector>;
impl BlockExecutor for MockExecutor {
type Transaction = TransactionSigned;
type Receipt = Receipt;
type Evm = MockEvm;
fn apply_pre_execution_changes(&mut self) -> Result<(), BlockExecutionError> {
Ok(())
}
fn execute_transaction_without_commit(
&mut self,
_tx: impl ExecutableTx<Self>,
) -> Result<ResultAndState<<Self::Evm as Evm>::HaltReason>, BlockExecutionError> {
// Call hook with our mock state for each transaction
if let Some(hook) = self.hook.as_mut() {
hook.on_state(StateChangeSource::Transaction(0), &self.state);
}
Ok(ResultAndState::new(
ExecutionResult::Success {
reason: SuccessReason::Return,
gas_used: 1000, // Mock gas used
gas_refunded: 0,
logs: vec![],
output: Output::Call(Bytes::from(vec![])),
},
Default::default(),
))
}
fn commit_transaction(
&mut self,
_output: ResultAndState<<Self::Evm as Evm>::HaltReason>,
_tx: impl ExecutableTx<Self>,
) -> Result<u64, BlockExecutionError> {
Ok(1000)
}
fn finish(
self,
) -> Result<(Self::Evm, BlockExecutionResult<Self::Receipt>), BlockExecutionError> {
let Self { hook, state, .. } = self;
// Call hook with our mock state
if let Some(mut hook) = hook {
hook.on_state(StateChangeSource::Transaction(0), &state);
}
// Create a mock EVM
let db = State::builder()
.with_database(EmptyDB::default())
.with_bundle_update()
.without_state_clear()
.build();
let evm = EthEvm::new(
Context::mainnet().with_db(db).build_mainnet_with_inspector(NoOpInspector {}),
false,
);
// Return successful result like the original tests
Ok((
evm,
BlockExecutionResult {
receipts: vec![],
requests: Requests::default(),
gas_used: 1000,
blob_gas_used: 0,
},
))
}
fn set_state_hook(&mut self, hook: Option<Box<dyn OnStateHook>>) {
self.hook = hook;
}
fn evm_mut(&mut self) -> &mut Self::Evm {
panic!("Mock executor evm_mut() not implemented")
}
fn evm(&self) -> &Self::Evm {
panic!("Mock executor evm() not implemented")
}
fn receipts(&self) -> &[Self::Receipt] {
&self.receipts
}
}
struct ChannelStateHook {
output: i32,
sender: mpsc::Sender<i32>,
}
impl OnStateHook for ChannelStateHook {
fn on_state(&mut self, _source: StateChangeSource, _state: &EvmState) {
let _ = self.sender.send(self.output);
}
}
use reth_revm::db::BundleState;
fn setup_test_recorder() -> Snapshotter {
let recorder = DebuggingRecorder::new();
@@ -574,38 +369,7 @@ mod tests {
}
#[test]
fn test_executor_metrics_hook_called() {
let metrics = EngineApiMetrics::default();
let input = RecoveredBlock::<reth_ethereum_primitives::Block>::default();
let (tx, rx) = mpsc::channel();
let expected_output = 42;
let state_hook = Box::new(ChannelStateHook { sender: tx, output: expected_output });
let state = EvmState::default();
let executor = MockExecutor::new(state);
// This will fail to create the EVM but should still call the hook
let _result = metrics.execute_metered::<_, EmptyDB, _>(
executor,
input.clone_transactions_recovered().map(Ok::<_, BlockExecutionError>),
input.transaction_count(),
state_hook,
|_| {},
);
// Check if hook was called (it might not be if finish() fails early)
match rx.try_recv() {
Ok(actual_output) => assert_eq!(actual_output, expected_output),
Err(_) => {
// Hook wasn't called, which is expected if the mock fails early
// The test still validates that the code compiles and runs
}
}
}
#[test]
fn test_executor_metrics_hook_metrics_recorded() {
fn test_record_block_execution_metrics() {
let snapshotter = setup_test_recorder();
let metrics = EngineApiMetrics::default();
@@ -614,45 +378,17 @@ mod tests {
metrics.executor.gas_per_second.set(0.0);
metrics.executor.gas_used_histogram.record(0.0);
let input = RecoveredBlock::<reth_ethereum_primitives::Block>::default();
let (tx, _rx) = mpsc::channel();
let state_hook = Box::new(ChannelStateHook { sender: tx, output: 42 });
// Create a state with some data
let state = {
let mut state = EvmState::default();
let storage =
EvmStorage::from_iter([(U256::from(1), EvmStorageSlot::new(U256::from(2), 0))]);
state.insert(
Default::default(),
Account {
info: AccountInfo {
balance: U256::from(100),
nonce: 10,
code_hash: B256::random(),
code: Default::default(),
account_id: None,
},
original_info: Box::new(AccountInfo::default()),
storage,
status: AccountStatus::default(),
transaction_id: 0,
},
);
state
let output = BlockExecutionOutput::<Receipt> {
state: BundleState::default(),
result: BlockExecutionResult {
receipts: vec![],
requests: Requests::default(),
gas_used: 21000,
blob_gas_used: 0,
},
};
let executor = MockExecutor::new(state);
// Execute (will fail but should still update some metrics)
let _result = metrics.execute_metered::<_, EmptyDB, _>(
executor,
input.clone_transactions_recovered().map(Ok::<_, BlockExecutionError>),
input.transaction_count(),
state_hook,
|_| {},
);
metrics.record_block_execution(&output, Duration::from_millis(100));
let snapshot = snapshotter.snapshot().into_vec();

View File

@@ -7,10 +7,10 @@ use crate::tree::{
payload_processor::{executor::WorkloadExecutor, PayloadProcessor},
precompile_cache::{CachedPrecompile, CachedPrecompileMetrics, PrecompileCacheMap},
sparse_trie::StateRootComputeOutcome,
EngineApiMetrics, EngineApiTreeState, ExecutionEnv, PayloadHandle, StateProviderBuilder,
StateProviderDatabase, TreeConfig,
EngineApiMetrics, EngineApiTreeState, ExecutionEnv, MeteredStateHook, PayloadHandle,
StateProviderBuilder, StateProviderDatabase, TreeConfig,
};
use alloy_consensus::transaction::Either;
use alloy_consensus::transaction::{Either, TxHashRef};
use alloy_eip7928::BlockAccessList;
use alloy_eips::{eip1898::BlockWithParent, NumHash};
use alloy_evm::Evm;
@@ -41,7 +41,7 @@ use reth_provider::{
ProviderError, PruneCheckpointReader, StageCheckpointReader, StateProvider,
StateProviderFactory, StateReader,
};
use reth_revm::db::State;
use reth_revm::db::{states::bundle_state::BundleRetention, State};
use reth_trie::{updates::TrieUpdates, HashedPostState, StateRoot};
use reth_trie_db::ChangesetCache;
use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
@@ -638,7 +638,13 @@ where
Ok(())
}
/// Executes a block with the given state provider
/// Executes a block with the given state provider.
///
/// This method orchestrates block execution:
/// 1. Sets up the EVM with state database and precompile caching
/// 2. Spawns a background task for incremental receipt root computation
/// 3. Executes transactions with metrics collection via state hooks
/// 4. Merges state transitions and records execution metrics
#[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
#[expect(clippy::type_complexity)]
fn execute_block<S, Err, T>(
@@ -701,31 +707,117 @@ where
let task_handle = ReceiptRootTaskHandle::new(receipt_rx, result_tx);
self.payload_processor.executor().spawn_blocking(move || task_handle.run(receipts_len));
// Wrap the state hook with metrics collection
let inner_hook = Box::new(handle.state_hook());
let state_hook =
MeteredStateHook { metrics: self.metrics.executor_metrics().clone(), inner_hook };
let transaction_count = input.transaction_count();
let executor = executor.with_state_hook(Some(Box::new(state_hook)));
let execution_start = Instant::now();
let state_hook = Box::new(handle.state_hook());
let (output, senders) = self.metrics.execute_metered(
// Execute all transactions and finalize
let (executor, senders) = self.execute_transactions(
executor,
handle.iter_transactions().map(|res| res.map_err(BlockExecutionError::other)),
input.transaction_count(),
state_hook,
|receipts| {
// Send the latest receipt to the background task for incremental root computation.
// The receipt is cloned here; encoding happens in the background thread.
if let Some(receipt) = receipts.last() {
// Infer tx_index from the number of receipts collected so far
let tx_index = receipts.len() - 1;
let _ = receipt_tx.send(IndexedReceipt::new(tx_index, receipt.clone()));
}
},
transaction_count,
handle.iter_transactions(),
&receipt_tx,
)?;
drop(receipt_tx);
let execution_finish = Instant::now();
let execution_time = execution_finish.duration_since(execution_start);
debug!(target: "engine::tree::payload_validator", elapsed = ?execution_time, "Executed block");
// Finish execution and get the result
let post_exec_start = Instant::now();
let (_evm, result) = debug_span!(target: "engine::tree", "finish")
.in_scope(|| executor.finish())
.map(|(evm, result)| (evm.into_db(), result))?;
self.metrics.record_post_execution(post_exec_start.elapsed());
// Merge transitions into bundle state
debug_span!(target: "engine::tree", "merge transitions")
.in_scope(|| db.merge_transitions(BundleRetention::Reverts));
let output = BlockExecutionOutput { result, state: db.take_bundle() };
let execution_duration = execution_start.elapsed();
self.metrics.record_block_execution(&output, execution_duration);
debug!(target: "engine::tree::payload_validator", elapsed = ?execution_duration, "Executed block");
Ok((output, senders, result_rx))
}
/// Executes transactions and collects senders, streaming receipts to a background task.
///
/// This method handles:
/// - Applying pre-execution changes (e.g., beacon root updates)
/// - Executing each transaction with timing metrics
/// - Streaming receipts to the receipt root computation task
/// - Collecting transaction senders for later use
///
/// Returns the executor (for finalization) and the collected senders.
fn execute_transactions<E, Tx, InnerTx, Err>(
&self,
mut executor: E,
transaction_count: usize,
transactions: impl Iterator<Item = Result<Tx, Err>>,
receipt_tx: &crossbeam_channel::Sender<IndexedReceipt<N::Receipt>>,
) -> Result<(E, Vec<Address>), BlockExecutionError>
where
E: BlockExecutor<Receipt = N::Receipt>,
Tx: alloy_evm::block::ExecutableTx<E> + alloy_evm::RecoveredTx<InnerTx>,
InnerTx: TxHashRef,
Err: core::error::Error + Send + Sync + 'static,
{
let mut senders = Vec::with_capacity(transaction_count);
// Apply pre-execution changes (e.g., beacon root update)
let pre_exec_start = Instant::now();
debug_span!(target: "engine::tree", "pre execution")
.in_scope(|| executor.apply_pre_execution_changes())?;
self.metrics.record_pre_execution(pre_exec_start.elapsed());
// Execute transactions
let exec_span = debug_span!(target: "engine::tree", "execution").entered();
let mut transactions = transactions.into_iter();
loop {
// Measure time spent waiting for next transaction from iterator
// (e.g., parallel signature recovery)
let wait_start = Instant::now();
let Some(tx_result) = transactions.next() else { break };
self.metrics.record_transaction_wait(wait_start.elapsed());
let tx = tx_result.map_err(BlockExecutionError::other)?;
let tx_signer = *<Tx as alloy_evm::RecoveredTx<InnerTx>>::signer(&tx);
let tx_hash = <Tx as alloy_evm::RecoveredTx<InnerTx>>::tx(&tx).tx_hash();
senders.push(tx_signer);
let span = debug_span!(
target: "engine::tree",
"execute tx",
?tx_hash,
gas_used = tracing::field::Empty,
);
let enter = span.entered();
trace!(target: "engine::tree", "Executing transaction");
let tx_start = Instant::now();
let gas_used = executor.execute_transaction(tx)?;
self.metrics.record_transaction_execution(tx_start.elapsed());
// Send the latest receipt to the background task for incremental root computation
if let Some(receipt) = executor.receipts().last() {
let tx_index = executor.receipts().len() - 1;
let _ = receipt_tx.send(IndexedReceipt::new(tx_index, receipt.clone()));
}
enter.record("gas_used", gas_used);
}
drop(exec_span);
Ok((executor, senders))
}
/// Compute state root for the given hashed post state in parallel.
///
/// Uses an overlay factory which provides the state of the parent block, along with the