From 098fa7f6119b332eb64b3fa2e2b4bd9414dbf5c9 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Wed, 6 Nov 2024 12:38:29 +0100 Subject: [PATCH] feat(engine): use execute_with_state_hook in ExecutorMetrics (#12316) --- Cargo.lock | 12 ++ crates/engine/tree/src/tree/mod.rs | 10 +- crates/evm/Cargo.toml | 1 + crates/evm/src/metrics.rs | 241 ++++++++++++++++++++++++++--- 4 files changed, 238 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0f6a62647a..f70611f7a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4863,7 +4863,9 @@ dependencies = [ "crossbeam-epoch", "crossbeam-utils", "hashbrown 0.15.0", + "indexmap 2.6.0", "metrics", + "ordered-float", "quanta", "sketches-ddsketch", ] @@ -5404,6 +5406,15 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" +[[package]] +name = "ordered-float" +version = "4.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44d501f1a72f71d3c063a6bbc8f7271fa73aa09fe5d6283b6571e2ed176a2537" +dependencies = [ + "num-traits", +] + [[package]] name = "overload" version = "0.1.1" @@ -7440,6 +7451,7 @@ dependencies = [ "auto_impl", "futures-util", "metrics", + "metrics-util", "parking_lot", "reth-chainspec", "reth-consensus", diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index c3e922d11c..36108e63bf 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -44,6 +44,7 @@ use reth_revm::database::StateProviderDatabase; use reth_stages_api::ControlFlow; use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput}; use reth_trie_parallel::parallel_root::{ParallelStateRoot, ParallelStateRootError}; +use revm_primitives::ResultAndState; use std::{ cmp::Ordering, collections::{btree_map, hash_map, BTreeMap, VecDeque}, @@ -2187,7 +2188,14 @@ where let block = block.unseal(); let exec_time = Instant::now(); - let output = self.metrics.executor.execute_metered(executor, (&block, U256::MAX).into())?; + // TODO: create StateRootTask with the receiving end of a channel and + // pass the sending end of the channel to the state hook. + let noop_state_hook = |_result_and_state: &ResultAndState| {}; + let output = self.metrics.executor.execute_metered( + executor, + (&block, U256::MAX).into(), + Box::new(noop_state_hook), + )?; trace!(target: "engine::tree", elapsed=?exec_time.elapsed(), ?block_number, "Executed block"); if let Err(err) = self.consensus.validate_block_post_execution( diff --git a/crates/evm/Cargo.toml b/crates/evm/Cargo.toml index a4ce3c3893..c895110209 100644 --- a/crates/evm/Cargo.toml +++ b/crates/evm/Cargo.toml @@ -40,6 +40,7 @@ parking_lot = { workspace = true, optional = true } parking_lot.workspace = true reth-ethereum-forks.workspace = true alloy-consensus.workspace = true +metrics-util = { workspace = true, features = ["debugging"] } [features] default = ["std"] diff --git a/crates/evm/src/metrics.rs b/crates/evm/src/metrics.rs index fbb2b858b1..3464bb96f4 100644 --- a/crates/evm/src/metrics.rs +++ b/crates/evm/src/metrics.rs @@ -2,14 +2,41 @@ //! //! Block processing related to syncing should take care to update the metrics by using either //! [`ExecutorMetrics::execute_metered`] or [`ExecutorMetrics::metered_one`]. -use std::time::Instant; - +use crate::{execute::Executor, system_calls::OnStateHook}; use metrics::{Counter, Gauge, Histogram}; use reth_execution_types::{BlockExecutionInput, BlockExecutionOutput}; use reth_metrics::Metrics; use reth_primitives::BlockWithSenders; +use revm_primitives::ResultAndState; +use std::time::Instant; -use crate::execute::Executor; +/// Wrapper struct that combines metrics and state hook +struct MeteredStateHook { + metrics: ExecutorMetrics, + inner_hook: Box, +} + +impl OnStateHook for MeteredStateHook { + fn on_state(&mut self, result_and_state: &ResultAndState) { + // Update the metrics for the number of accounts, storage slots and bytecodes loaded + let accounts = result_and_state.state.keys().len(); + let storage_slots = + result_and_state.state.values().map(|account| account.storage.len()).sum::(); + let bytecodes = result_and_state + .state + .values() + .filter(|account| !account.info.is_empty_code_hash()) + .collect::>() + .len(); + + self.metrics.accounts_loaded_histogram.record(accounts as f64); + self.metrics.storage_slots_loaded_histogram.record(storage_slots as f64); + self.metrics.bytecodes_loaded_histogram.record(bytecodes as f64); + + // Call the original state hook + self.inner_hook.on_state(result_and_state); + } +} /// Executor metrics. // TODO(onbjerg): add sload/sstore @@ -65,10 +92,13 @@ impl ExecutorMetrics { /// /// Compared to [`Self::metered_one`], this method additionally updates metrics for the number /// of accounts, storage slots and bytecodes loaded and updated. + /// Execute the given block using the provided [`Executor`] and update metrics for the + /// execution. pub fn execute_metered<'a, E, DB, O, Error>( &self, executor: E, input: BlockExecutionInput<'a, BlockWithSenders>, + state_hook: Box, ) -> Result, Error> where E: Executor< @@ -78,29 +108,16 @@ impl ExecutorMetrics { Error = Error, >, { - let output = self.metered(input.block, || { - executor.execute_with_state_closure(input, |state: &revm::db::State| { - // Update the metrics for the number of accounts, storage slots and bytecodes - // loaded - let accounts = state.cache.accounts.len(); - let storage_slots = state - .cache - .accounts - .values() - .filter_map(|account| { - account.account.as_ref().map(|account| account.storage.len()) - }) - .sum::(); - let bytecodes = state.cache.contracts.len(); + // clone here is cheap, all the metrics are Option>. additionally + // they are gloally registered so that the data recorded in the hook will + // be accessible. + let wrapper = MeteredStateHook { metrics: self.clone(), inner_hook: state_hook }; - // Record all state present in the cache state as loaded even though some might have - // been newly created. - // TODO: Consider spitting these into loaded and newly created. - self.accounts_loaded_histogram.record(accounts as f64); - self.storage_slots_loaded_histogram.record(storage_slots as f64); - self.bytecodes_loaded_histogram.record(bytecodes as f64); - }) - })?; + // Store reference to block for metered + let block = input.block; + + // Use metered to execute and track timing/gas metrics + let output = self.metered(block, || executor.execute_with_state_hook(input, wrapper))?; // Update the metrics for the number of accounts, storage slots and bytecodes updated let accounts = output.state.state.len(); @@ -123,3 +140,177 @@ impl ExecutorMetrics { self.metered(input.block, || f(input)) } } + +#[cfg(test)] +mod tests { + use super::*; + use alloy_eips::eip7685::Requests; + use metrics_util::debugging::{DebugValue, DebuggingRecorder, Snapshotter}; + use revm::db::BundleState; + use revm_primitives::{ + Account, AccountInfo, AccountStatus, Bytes, EvmState, EvmStorage, EvmStorageSlot, + ExecutionResult, Output, SuccessReason, B256, U256, + }; + use std::sync::mpsc; + + /// A mock executor that simulates state changes + struct MockExecutor { + result_and_state: ResultAndState, + } + + impl Executor<()> for MockExecutor { + type Input<'a> + = BlockExecutionInput<'a, BlockWithSenders> + where + Self: 'a; + type Output = BlockExecutionOutput<()>; + type Error = std::convert::Infallible; + + fn execute(self, _input: Self::Input<'_>) -> Result { + Ok(BlockExecutionOutput { + state: BundleState::default(), + receipts: vec![], + requests: Requests::default(), + gas_used: 0, + }) + } + fn execute_with_state_closure( + self, + _input: Self::Input<'_>, + _state: F, + ) -> Result + where + F: FnMut(&revm::State<()>), + { + Ok(BlockExecutionOutput { + state: BundleState::default(), + receipts: vec![], + requests: Requests::default(), + gas_used: 0, + }) + } + fn execute_with_state_hook( + self, + _input: Self::Input<'_>, + mut hook: F, + ) -> Result + where + F: OnStateHook + 'static, + { + // Call hook with our mock state + hook.on_state(&self.result_and_state); + + Ok(BlockExecutionOutput { + state: BundleState::default(), + receipts: vec![], + requests: Requests::default(), + gas_used: 0, + }) + } + } + + struct ChannelStateHook { + output: i32, + sender: mpsc::Sender, + } + + impl OnStateHook for ChannelStateHook { + fn on_state(&mut self, _result_and_state: &ResultAndState) { + let _ = self.sender.send(self.output); + } + } + + fn setup_test_recorder() -> Snapshotter { + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + recorder.install().unwrap(); + snapshotter + } + + #[test] + fn test_executor_metrics_hook_metrics_recorded() { + let snapshotter = setup_test_recorder(); + let metrics = ExecutorMetrics::default(); + + let input = BlockExecutionInput { + block: &BlockWithSenders::default(), + total_difficulty: Default::default(), + }; + + let (tx, _rx) = mpsc::channel(); + let expected_output = 42; + let state_hook = Box::new(ChannelStateHook { sender: tx, output: expected_output }); + + let result_and_state = ResultAndState { + result: ExecutionResult::Success { + reason: SuccessReason::Stop, + gas_used: 100, + output: Output::Call(Bytes::default()), + logs: vec![], + gas_refunded: 0, + }, + state: { + let mut state = EvmState::default(); + let storage = + EvmStorage::from_iter([(U256::from(1), EvmStorageSlot::new(U256::from(2)))]); + state.insert( + Default::default(), + Account { + info: AccountInfo { + balance: U256::from(100), + nonce: 10, + code_hash: B256::random(), + code: Default::default(), + }, + storage, + status: AccountStatus::Loaded, + }, + ); + state + }, + }; + let executor = MockExecutor { result_and_state }; + let _result = metrics.execute_metered(executor, input, state_hook).unwrap(); + + let snapshot = snapshotter.snapshot().into_vec(); + + for metric in snapshot { + let metric_name = metric.0.key().name(); + if metric_name == "sync.execution.accounts_loaded_histogram" || + metric_name == "sync.execution.storage_slots_loaded_histogram" || + metric_name == "sync.execution.bytecodes_loaded_histogram" + { + if let DebugValue::Histogram(vs) = metric.3 { + assert!( + vs.iter().any(|v| v.into_inner() > 0.0), + "metric {metric_name} not recorded" + ); + } + } + } + } + + #[test] + fn test_executor_metrics_hook_called() { + let metrics = ExecutorMetrics::default(); + + let input = BlockExecutionInput { + block: &BlockWithSenders::default(), + total_difficulty: Default::default(), + }; + + let (tx, rx) = mpsc::channel(); + let expected_output = 42; + let state_hook = Box::new(ChannelStateHook { sender: tx, output: expected_output }); + + let result_and_state = ResultAndState { + result: ExecutionResult::Revert { gas_used: 0, output: Default::default() }, + state: EvmState::default(), + }; + let executor = MockExecutor { result_and_state }; + let _result = metrics.execute_metered(executor, input, state_hook).unwrap(); + + let actual_output = rx.try_recv().unwrap(); + assert_eq!(actual_output, expected_output); + } +}