diff --git a/crates/engine/tree/src/tree/metrics.rs b/crates/engine/tree/src/tree/metrics.rs index 3d05cee8e0..ea17ff2314 100644 --- a/crates/engine/tree/src/tree/metrics.rs +++ b/crates/engine/tree/src/tree/metrics.rs @@ -1,25 +1,15 @@ -use crate::tree::{error::InsertBlockFatalError, MeteredStateHook, TreeOutcome}; -use alloy_consensus::transaction::TxHashRef; -use alloy_evm::{ - block::{BlockExecutor, ExecutableTx}, - Evm, -}; +use crate::tree::{error::InsertBlockFatalError, TreeOutcome}; use alloy_rpc_types_engine::{PayloadStatus, PayloadStatusEnum}; -use core::borrow::BorrowMut; use reth_engine_primitives::{ForkchoiceStatus, OnForkChoiceUpdated}; -use reth_errors::{BlockExecutionError, ProviderError}; -use reth_evm::{metrics::ExecutorMetrics, OnStateHook}; +use reth_errors::ProviderError; +use reth_evm::metrics::ExecutorMetrics; use reth_execution_types::BlockExecutionOutput; use reth_metrics::{ metrics::{Counter, Gauge, Histogram}, Metrics, }; -use reth_primitives_traits::SignedTransaction; use reth_trie::updates::TrieUpdates; -use revm::database::{states::bundle_state::BundleRetention, State}; -use revm_primitives::Address; -use std::time::Instant; -use tracing::{debug_span, trace}; +use std::time::{Duration, Instant}; /// Metrics for the `EngineApi`. #[derive(Debug, Default)] @@ -35,114 +25,24 @@ pub(crate) struct EngineApiMetrics { } impl EngineApiMetrics { - /// Helper function for metered execution - fn metered(&self, f: F) -> R - where - F: FnOnce() -> (u64, R), - { - // Execute the block and record the elapsed time. - let execute_start = Instant::now(); - let (gas_used, output) = f(); - let execution_duration = execute_start.elapsed().as_secs_f64(); - - // Update gas metrics. - self.executor.gas_processed_total.increment(gas_used); - self.executor.gas_per_second.set(gas_used as f64 / execution_duration); - self.executor.gas_used_histogram.record(gas_used as f64); - self.executor.execution_histogram.record(execution_duration); - self.executor.execution_duration.set(execution_duration); - - output - } - - /// Execute the given block using the provided [`BlockExecutor`] and update metrics for the - /// execution. + /// Records metrics for block execution. /// /// This method updates metrics for execution time, gas usage, and the number - /// of accounts, storage slots and bytecodes loaded and updated. - /// - /// The optional `on_receipt` callback is invoked after each transaction with the receipt - /// index and a reference to all receipts collected so far. This allows callers to stream - /// receipts to a background task for incremental receipt root computation. - pub(crate) fn execute_metered( + /// of accounts, storage slots and bytecodes updated. + pub(crate) fn record_block_execution( &self, - executor: E, - mut transactions: impl Iterator, BlockExecutionError>>, - transaction_count: usize, - state_hook: Box, - mut on_receipt: F, - ) -> Result<(BlockExecutionOutput, Vec
), BlockExecutionError> - where - DB: alloy_evm::Database, - E: BlockExecutor>>, Transaction: SignedTransaction>, - F: FnMut(&[E::Receipt]), - { - // clone here is cheap, all the metrics are Option>. additionally - // they are globally registered so that the data recorded in the hook will - // be accessible. - let wrapper = MeteredStateHook { metrics: self.executor.clone(), inner_hook: state_hook }; + output: &BlockExecutionOutput, + execution_duration: Duration, + ) { + let execution_secs = execution_duration.as_secs_f64(); + let gas_used = output.result.gas_used; - let mut senders = Vec::with_capacity(transaction_count); - let mut executor = executor.with_state_hook(Some(Box::new(wrapper))); - - let f = || { - let start = Instant::now(); - debug_span!(target: "engine::tree", "pre execution") - .entered() - .in_scope(|| executor.apply_pre_execution_changes())?; - self.executor.pre_execution_histogram.record(start.elapsed()); - - let exec_span = debug_span!(target: "engine::tree", "execution").entered(); - loop { - let start = Instant::now(); - let Some(tx) = transactions.next() else { break }; - self.executor.transaction_wait_histogram.record(start.elapsed()); - - let tx = tx?; - senders.push(*tx.signer()); - - let span = debug_span!( - target: "engine::tree", - "execute tx", - tx_hash = ?tx.tx().tx_hash(), - gas_used = tracing::field::Empty, - ); - let enter = span.entered(); - trace!(target: "engine::tree", "Executing transaction"); - let start = Instant::now(); - let gas_used = executor.execute_transaction(tx)?; - self.executor.transaction_execution_histogram.record(start.elapsed()); - - // Invoke callback with the latest receipt - on_receipt(executor.receipts()); - - // record the tx gas used - enter.record("gas_used", gas_used); - } - drop(exec_span); - - let start = Instant::now(); - let result = debug_span!(target: "engine::tree", "finish") - .entered() - .in_scope(|| executor.finish()) - .map(|(evm, result)| (evm.into_db(), result)); - self.executor.post_execution_histogram.record(start.elapsed()); - - result - }; - - // Use metered to execute and track timing/gas metrics - let (mut db, result) = self.metered(|| { - let res = f(); - let gas_used = res.as_ref().map(|r| r.1.gas_used).unwrap_or(0); - (gas_used, res) - })?; - - // merge transitions into bundle state - debug_span!(target: "engine::tree", "merge transitions") - .entered() - .in_scope(|| db.borrow_mut().merge_transitions(BundleRetention::Reverts)); - let output = BlockExecutionOutput { result, state: db.borrow_mut().take_bundle() }; + // Update gas metrics + self.executor.gas_processed_total.increment(gas_used); + self.executor.gas_per_second.set(gas_used as f64 / execution_secs); + self.executor.gas_used_histogram.record(gas_used as f64); + self.executor.execution_histogram.record(execution_secs); + self.executor.execution_duration.set(execution_secs); // Update the metrics for the number of accounts, storage slots and bytecodes updated let accounts = output.state.state.len(); @@ -153,8 +53,31 @@ impl EngineApiMetrics { self.executor.accounts_updated_histogram.record(accounts as f64); self.executor.storage_slots_updated_histogram.record(storage_slots as f64); self.executor.bytecodes_updated_histogram.record(bytecodes as f64); + } - Ok((output, senders)) + /// Returns a reference to the executor metrics for use in state hooks. + pub(crate) const fn executor_metrics(&self) -> &ExecutorMetrics { + &self.executor + } + + /// Records the duration of block pre-execution changes (e.g., beacon root update). + pub(crate) fn record_pre_execution(&self, elapsed: Duration) { + self.executor.pre_execution_histogram.record(elapsed); + } + + /// Records the duration of block post-execution changes (e.g., finalization). + pub(crate) fn record_post_execution(&self, elapsed: Duration) { + self.executor.post_execution_histogram.record(elapsed); + } + + /// Records the time spent waiting for the next transaction from the iterator. + pub(crate) fn record_transaction_wait(&self, elapsed: Duration) { + self.executor.transaction_wait_histogram.record(elapsed); + } + + /// Records the duration of a single transaction execution. + pub(crate) fn record_transaction_execution(&self, elapsed: Duration) { + self.executor.transaction_execution_histogram.record(elapsed); } } @@ -433,138 +356,10 @@ pub(crate) struct BlockBufferMetrics { mod tests { use super::*; use alloy_eips::eip7685::Requests; - use alloy_evm::block::StateChangeSource; - use alloy_primitives::{B256, U256}; use metrics_util::debugging::{DebuggingRecorder, Snapshotter}; - use reth_ethereum_primitives::{Receipt, TransactionSigned}; - use reth_evm_ethereum::EthEvm; + use reth_ethereum_primitives::Receipt; use reth_execution_types::BlockExecutionResult; - use reth_primitives_traits::RecoveredBlock; - use revm::{ - context::result::{ExecutionResult, Output, ResultAndState, SuccessReason}, - database::State, - database_interface::EmptyDB, - inspector::NoOpInspector, - state::{Account, AccountInfo, AccountStatus, EvmState, EvmStorage, EvmStorageSlot}, - Context, MainBuilder, MainContext, - }; - use revm_primitives::Bytes; - use std::sync::mpsc; - - /// A simple mock executor for testing that doesn't require complex EVM setup - struct MockExecutor { - state: EvmState, - receipts: Vec, - hook: Option>, - } - - impl MockExecutor { - fn new(state: EvmState) -> Self { - Self { state, receipts: vec![], hook: None } - } - } - - // Mock Evm type for testing - type MockEvm = EthEvm, NoOpInspector>; - - impl BlockExecutor for MockExecutor { - type Transaction = TransactionSigned; - type Receipt = Receipt; - type Evm = MockEvm; - - fn apply_pre_execution_changes(&mut self) -> Result<(), BlockExecutionError> { - Ok(()) - } - - fn execute_transaction_without_commit( - &mut self, - _tx: impl ExecutableTx, - ) -> Result::HaltReason>, BlockExecutionError> { - // Call hook with our mock state for each transaction - if let Some(hook) = self.hook.as_mut() { - hook.on_state(StateChangeSource::Transaction(0), &self.state); - } - - Ok(ResultAndState::new( - ExecutionResult::Success { - reason: SuccessReason::Return, - gas_used: 1000, // Mock gas used - gas_refunded: 0, - logs: vec![], - output: Output::Call(Bytes::from(vec![])), - }, - Default::default(), - )) - } - - fn commit_transaction( - &mut self, - _output: ResultAndState<::HaltReason>, - _tx: impl ExecutableTx, - ) -> Result { - Ok(1000) - } - - fn finish( - self, - ) -> Result<(Self::Evm, BlockExecutionResult), BlockExecutionError> { - let Self { hook, state, .. } = self; - - // Call hook with our mock state - if let Some(mut hook) = hook { - hook.on_state(StateChangeSource::Transaction(0), &state); - } - - // Create a mock EVM - let db = State::builder() - .with_database(EmptyDB::default()) - .with_bundle_update() - .without_state_clear() - .build(); - let evm = EthEvm::new( - Context::mainnet().with_db(db).build_mainnet_with_inspector(NoOpInspector {}), - false, - ); - - // Return successful result like the original tests - Ok(( - evm, - BlockExecutionResult { - receipts: vec![], - requests: Requests::default(), - gas_used: 1000, - blob_gas_used: 0, - }, - )) - } - - fn set_state_hook(&mut self, hook: Option>) { - self.hook = hook; - } - - fn evm_mut(&mut self) -> &mut Self::Evm { - panic!("Mock executor evm_mut() not implemented") - } - - fn evm(&self) -> &Self::Evm { - panic!("Mock executor evm() not implemented") - } - - fn receipts(&self) -> &[Self::Receipt] { - &self.receipts - } - } - - struct ChannelStateHook { - output: i32, - sender: mpsc::Sender, - } - - impl OnStateHook for ChannelStateHook { - fn on_state(&mut self, _source: StateChangeSource, _state: &EvmState) { - let _ = self.sender.send(self.output); - } - } + use reth_revm::db::BundleState; fn setup_test_recorder() -> Snapshotter { let recorder = DebuggingRecorder::new(); @@ -574,38 +369,7 @@ mod tests { } #[test] - fn test_executor_metrics_hook_called() { - let metrics = EngineApiMetrics::default(); - let input = RecoveredBlock::::default(); - - let (tx, rx) = mpsc::channel(); - let expected_output = 42; - let state_hook = Box::new(ChannelStateHook { sender: tx, output: expected_output }); - - let state = EvmState::default(); - let executor = MockExecutor::new(state); - - // This will fail to create the EVM but should still call the hook - let _result = metrics.execute_metered::<_, EmptyDB, _>( - executor, - input.clone_transactions_recovered().map(Ok::<_, BlockExecutionError>), - input.transaction_count(), - state_hook, - |_| {}, - ); - - // Check if hook was called (it might not be if finish() fails early) - match rx.try_recv() { - Ok(actual_output) => assert_eq!(actual_output, expected_output), - Err(_) => { - // Hook wasn't called, which is expected if the mock fails early - // The test still validates that the code compiles and runs - } - } - } - - #[test] - fn test_executor_metrics_hook_metrics_recorded() { + fn test_record_block_execution_metrics() { let snapshotter = setup_test_recorder(); let metrics = EngineApiMetrics::default(); @@ -614,45 +378,17 @@ mod tests { metrics.executor.gas_per_second.set(0.0); metrics.executor.gas_used_histogram.record(0.0); - let input = RecoveredBlock::::default(); - - let (tx, _rx) = mpsc::channel(); - let state_hook = Box::new(ChannelStateHook { sender: tx, output: 42 }); - - // Create a state with some data - let state = { - let mut state = EvmState::default(); - let storage = - EvmStorage::from_iter([(U256::from(1), EvmStorageSlot::new(U256::from(2), 0))]); - state.insert( - Default::default(), - Account { - info: AccountInfo { - balance: U256::from(100), - nonce: 10, - code_hash: B256::random(), - code: Default::default(), - account_id: None, - }, - original_info: Box::new(AccountInfo::default()), - storage, - status: AccountStatus::default(), - transaction_id: 0, - }, - ); - state + let output = BlockExecutionOutput:: { + state: BundleState::default(), + result: BlockExecutionResult { + receipts: vec![], + requests: Requests::default(), + gas_used: 21000, + blob_gas_used: 0, + }, }; - let executor = MockExecutor::new(state); - - // Execute (will fail but should still update some metrics) - let _result = metrics.execute_metered::<_, EmptyDB, _>( - executor, - input.clone_transactions_recovered().map(Ok::<_, BlockExecutionError>), - input.transaction_count(), - state_hook, - |_| {}, - ); + metrics.record_block_execution(&output, Duration::from_millis(100)); let snapshot = snapshotter.snapshot().into_vec(); diff --git a/crates/engine/tree/src/tree/payload_validator.rs b/crates/engine/tree/src/tree/payload_validator.rs index 573962a8e5..e02fb2f9e4 100644 --- a/crates/engine/tree/src/tree/payload_validator.rs +++ b/crates/engine/tree/src/tree/payload_validator.rs @@ -7,10 +7,10 @@ use crate::tree::{ payload_processor::{executor::WorkloadExecutor, PayloadProcessor}, precompile_cache::{CachedPrecompile, CachedPrecompileMetrics, PrecompileCacheMap}, sparse_trie::StateRootComputeOutcome, - EngineApiMetrics, EngineApiTreeState, ExecutionEnv, PayloadHandle, StateProviderBuilder, - StateProviderDatabase, TreeConfig, + EngineApiMetrics, EngineApiTreeState, ExecutionEnv, MeteredStateHook, PayloadHandle, + StateProviderBuilder, StateProviderDatabase, TreeConfig, }; -use alloy_consensus::transaction::Either; +use alloy_consensus::transaction::{Either, TxHashRef}; use alloy_eip7928::BlockAccessList; use alloy_eips::{eip1898::BlockWithParent, NumHash}; use alloy_evm::Evm; @@ -41,7 +41,7 @@ use reth_provider::{ ProviderError, PruneCheckpointReader, StageCheckpointReader, StateProvider, StateProviderFactory, StateReader, }; -use reth_revm::db::State; +use reth_revm::db::{states::bundle_state::BundleRetention, State}; use reth_trie::{updates::TrieUpdates, HashedPostState, StateRoot}; use reth_trie_db::ChangesetCache; use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError}; @@ -638,7 +638,13 @@ where Ok(()) } - /// Executes a block with the given state provider + /// Executes a block with the given state provider. + /// + /// This method orchestrates block execution: + /// 1. Sets up the EVM with state database and precompile caching + /// 2. Spawns a background task for incremental receipt root computation + /// 3. Executes transactions with metrics collection via state hooks + /// 4. Merges state transitions and records execution metrics #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)] #[expect(clippy::type_complexity)] fn execute_block( @@ -701,31 +707,117 @@ where let task_handle = ReceiptRootTaskHandle::new(receipt_rx, result_tx); self.payload_processor.executor().spawn_blocking(move || task_handle.run(receipts_len)); + // Wrap the state hook with metrics collection + let inner_hook = Box::new(handle.state_hook()); + let state_hook = + MeteredStateHook { metrics: self.metrics.executor_metrics().clone(), inner_hook }; + + let transaction_count = input.transaction_count(); + let executor = executor.with_state_hook(Some(Box::new(state_hook))); + let execution_start = Instant::now(); - let state_hook = Box::new(handle.state_hook()); - let (output, senders) = self.metrics.execute_metered( + + // Execute all transactions and finalize + let (executor, senders) = self.execute_transactions( executor, - handle.iter_transactions().map(|res| res.map_err(BlockExecutionError::other)), - input.transaction_count(), - state_hook, - |receipts| { - // Send the latest receipt to the background task for incremental root computation. - // The receipt is cloned here; encoding happens in the background thread. - if let Some(receipt) = receipts.last() { - // Infer tx_index from the number of receipts collected so far - let tx_index = receipts.len() - 1; - let _ = receipt_tx.send(IndexedReceipt::new(tx_index, receipt.clone())); - } - }, + transaction_count, + handle.iter_transactions(), + &receipt_tx, )?; drop(receipt_tx); - let execution_finish = Instant::now(); - let execution_time = execution_finish.duration_since(execution_start); - debug!(target: "engine::tree::payload_validator", elapsed = ?execution_time, "Executed block"); + // Finish execution and get the result + let post_exec_start = Instant::now(); + let (_evm, result) = debug_span!(target: "engine::tree", "finish") + .in_scope(|| executor.finish()) + .map(|(evm, result)| (evm.into_db(), result))?; + self.metrics.record_post_execution(post_exec_start.elapsed()); + + // Merge transitions into bundle state + debug_span!(target: "engine::tree", "merge transitions") + .in_scope(|| db.merge_transitions(BundleRetention::Reverts)); + + let output = BlockExecutionOutput { result, state: db.take_bundle() }; + + let execution_duration = execution_start.elapsed(); + self.metrics.record_block_execution(&output, execution_duration); + + debug!(target: "engine::tree::payload_validator", elapsed = ?execution_duration, "Executed block"); Ok((output, senders, result_rx)) } + /// Executes transactions and collects senders, streaming receipts to a background task. + /// + /// This method handles: + /// - Applying pre-execution changes (e.g., beacon root updates) + /// - Executing each transaction with timing metrics + /// - Streaming receipts to the receipt root computation task + /// - Collecting transaction senders for later use + /// + /// Returns the executor (for finalization) and the collected senders. + fn execute_transactions( + &self, + mut executor: E, + transaction_count: usize, + transactions: impl Iterator>, + receipt_tx: &crossbeam_channel::Sender>, + ) -> Result<(E, Vec
), BlockExecutionError> + where + E: BlockExecutor, + Tx: alloy_evm::block::ExecutableTx + alloy_evm::RecoveredTx, + InnerTx: TxHashRef, + Err: core::error::Error + Send + Sync + 'static, + { + let mut senders = Vec::with_capacity(transaction_count); + + // Apply pre-execution changes (e.g., beacon root update) + let pre_exec_start = Instant::now(); + debug_span!(target: "engine::tree", "pre execution") + .in_scope(|| executor.apply_pre_execution_changes())?; + self.metrics.record_pre_execution(pre_exec_start.elapsed()); + + // Execute transactions + let exec_span = debug_span!(target: "engine::tree", "execution").entered(); + let mut transactions = transactions.into_iter(); + loop { + // Measure time spent waiting for next transaction from iterator + // (e.g., parallel signature recovery) + let wait_start = Instant::now(); + let Some(tx_result) = transactions.next() else { break }; + self.metrics.record_transaction_wait(wait_start.elapsed()); + + let tx = tx_result.map_err(BlockExecutionError::other)?; + let tx_signer = *>::signer(&tx); + let tx_hash = >::tx(&tx).tx_hash(); + + senders.push(tx_signer); + + let span = debug_span!( + target: "engine::tree", + "execute tx", + ?tx_hash, + gas_used = tracing::field::Empty, + ); + let enter = span.entered(); + trace!(target: "engine::tree", "Executing transaction"); + + let tx_start = Instant::now(); + let gas_used = executor.execute_transaction(tx)?; + self.metrics.record_transaction_execution(tx_start.elapsed()); + + // Send the latest receipt to the background task for incremental root computation + if let Some(receipt) = executor.receipts().last() { + let tx_index = executor.receipts().len() - 1; + let _ = receipt_tx.send(IndexedReceipt::new(tx_index, receipt.clone())); + } + + enter.record("gas_used", gas_used); + } + drop(exec_span); + + Ok((executor, senders)) + } + /// Compute state root for the given hashed post state in parallel. /// /// Uses an overlay factory which provides the state of the parent block, along with the