diff --git a/crates/engine/tree/src/tree/cached_state.rs b/crates/engine/tree/src/tree/cached_state.rs index 3409af9c52..039764d7e8 100644 --- a/crates/engine/tree/src/tree/cached_state.rs +++ b/crates/engine/tree/src/tree/cached_state.rs @@ -15,8 +15,8 @@ use reth_trie::{ MultiProofTargets, StorageMultiProof, StorageProof, TrieInput, }; use revm_primitives::map::DefaultHashBuilder; -use std::time::{Duration, Instant}; -use tracing::{debug, trace}; +use std::time::Duration; +use tracing::trace; pub(crate) type Cache = mini_moka::sync::Cache; @@ -48,35 +48,6 @@ where } } -impl CachedStateProvider { - /// Creates a new [`SavedCache`] from the given state updates and executed block hash. - /// - /// This does not update the code cache, because no changes are required to the code cache on - /// state change. - /// - /// NOTE: Consumers should ensure that these caches are not in use by a state provider for a - /// previous block - otherwise, this update will cause that state provider to contain future - /// state, which would be incorrect. - pub(crate) fn save_cache( - self, - executed_block_hash: B256, - state_updates: &BundleState, - ) -> Result { - let Self { caches, metrics, state_provider: _ } = self; - let start = Instant::now(); - - caches.insert_state(state_updates)?; - - // create a saved cache with the executed block hash, same metrics, and updated caches - let saved_cache = SavedCache { hash: executed_block_hash, caches, metrics }; - saved_cache.update_metrics(); - - debug!(target: "engine::caching", update_latency=?start.elapsed(), "Updated state caches"); - - Ok(saved_cache) - } -} - /// Metrics for the cached state provider, showing hits / misses for each cache #[derive(Metrics, Clone)] #[metrics(scope = "sync.caching")] diff --git a/crates/engine/tree/src/tree/metrics.rs b/crates/engine/tree/src/tree/metrics.rs index 839b6c9209..84a633815b 100644 --- a/crates/engine/tree/src/tree/metrics.rs +++ b/crates/engine/tree/src/tree/metrics.rs @@ -16,8 +16,6 @@ pub(crate) struct EngineApiMetrics { pub(crate) block_validation: BlockValidationMetrics, /// A copy of legacy blockchain tree metrics, to be replaced when we replace the old tree pub(crate) tree: TreeMetrics, - /// Metrics for transaction prewarming threads - pub(crate) prewarm: PrewarmThreadMetrics, } /// Metrics for the entire blockchain tree @@ -71,10 +69,6 @@ pub(crate) struct BlockValidationMetrics { pub(crate) state_root_duration: Gauge, /// Trie input computation duration pub(crate) trie_input_duration: Gauge, - /// Cache saving duration - pub(crate) cache_saving_duration: Gauge, - /// State root config creation duration - pub(crate) state_root_config_duration: Gauge, } impl BlockValidationMetrics { @@ -87,26 +81,6 @@ impl BlockValidationMetrics { } } -/// Metrics for prewarming threads -#[derive(Metrics, Clone)] -#[metrics(scope = "sync.prewarm")] -pub(crate) struct PrewarmThreadMetrics { - /// Prewarm thread spawn duration - pub(crate) spawn_duration: Gauge, - /// A histogram of the prewarm thread spawn duration - pub(crate) spawn_duration_histogram: Histogram, - /// The number of transactions in the block - pub(crate) transactions: Gauge, - /// A histogram of the number of transactions in the block - pub(crate) transactions_histogram: Histogram, - /// A histogram of total runtime durations for prewarm threads - pub(crate) total_runtime: Histogram, - /// A histogram of execution durations for prewarm threads - pub(crate) execution_duration: Histogram, - /// A histogram for total prefetch targets in prewarm threads - pub(crate) prefetch_storage_targets: Histogram, -} - /// Metrics for the blockchain tree block buffer #[derive(Metrics)] #[metrics(scope = "blockchain_tree.block_buffer")] diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index ee5c63a794..50dd515b83 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -3,24 +3,18 @@ use crate::{ chain::FromOrchestrator, engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine}, persistence::PersistenceHandle, - tree::{ - cached_state::{CachedStateMetrics, CachedStateProvider, ProviderCacheBuilder}, - metrics::EngineApiMetrics, - }, + tree::{cached_state::CachedStateProvider, metrics::EngineApiMetrics}, }; -use alloy_consensus::{transaction::Recovered, BlockHeader}; +use alloy_consensus::BlockHeader; use alloy_eips::BlockNumHash; use alloy_primitives::{ - keccak256, - map::{B256Set, HashMap, HashSet}, + map::{HashMap, HashSet}, BlockNumber, B256, U256, }; use alloy_rpc_types_engine::{ ForkchoiceState, PayloadStatus, PayloadStatusEnum, PayloadValidationError, }; -use cached_state::{ProviderCaches, SavedCache}; use error::{InsertBlockError, InsertBlockErrorKind, InsertBlockFatalError}; -use metrics::PrewarmThreadMetrics; use persistence_state::CurrentPersistenceAction; use reth_chain_state::{ CanonicalInMemoryState, ExecutedBlock, ExecutedBlockWithTrieUpdates, @@ -34,33 +28,21 @@ use reth_engine_primitives::{ }; use reth_errors::{ConsensusError, ProviderResult}; use reth_ethereum_primitives::EthPrimitives; -use reth_evm::{ - execute::BlockExecutorProvider, - system_calls::{NoopHook, OnStateHook}, - ConfigureEvm, Evm, -}; +use reth_evm::{execute::BlockExecutorProvider, ConfigureEvm}; use reth_payload_builder::PayloadBuilderHandle; use reth_payload_primitives::{EngineApiMessageVersion, PayloadBuilderAttributes}; use reth_primitives_traits::{ Block, GotExpected, NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader, - SignedTransaction, }; use reth_provider::{ - providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory, - ExecutionOutcome, HashedPostStateProvider, ProviderError, StateCommitmentProvider, - StateProviderBox, StateProviderFactory, StateReader, StateRootProvider, TransactionVariant, + providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, ExecutionOutcome, + HashedPostStateProvider, ProviderError, StateCommitmentProvider, StateProviderBox, + StateProviderFactory, StateReader, StateRootProvider, TransactionVariant, }; -use reth_revm::{cancelled::ManualCancel, database::StateProviderDatabase}; +use reth_revm::database::StateProviderDatabase; use reth_stages_api::ControlFlow; -use reth_trie::{ - trie_cursor::InMemoryTrieCursorFactory, updates::TrieUpdates, HashedPostState, - MultiProofTargets, TrieInput, -}; -use reth_trie_db::DatabaseTrieCursorFactory; +use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput}; use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError}; -use root::{ - StateRootComputeOutcome, StateRootConfig, StateRootHandle, StateRootMessage, StateRootTask, -}; use std::{ cmp::Ordering, collections::{btree_map, hash_map, BTreeMap, VecDeque}, @@ -68,9 +50,9 @@ use std::{ ops::Bound, sync::{ mpsc::{Receiver, RecvError, RecvTimeoutError, Sender}, - Arc, RwLock, + Arc, }, - time::{Duration, Instant}, + time::Instant, }; use tokio::sync::{ mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, @@ -88,6 +70,8 @@ mod metrics; mod payload_processor; mod persistence_state; pub mod root; +// TODO(alexey): compare trie updates in `insert_block_inner` +#[allow(unused)] mod trie_updates; use crate::tree::{ @@ -100,7 +84,6 @@ pub use config::TreeConfig; pub use invalid_block_hook::{InvalidBlockHooks, NoopInvalidBlockHook}; pub use invalid_headers::InvalidHeaderCache; pub use persistence_state::PersistenceState; -use trie_updates::compare_trie_updates; /// Keeps track of the state of the tree. /// @@ -456,7 +439,7 @@ pub struct StateProviderBuilder { impl StateProviderBuilder { /// Creates a new state provider from the provider factory, historical block hash and optional - /// overlayed blocks. + /// overlaid blocks. fn new( provider_factory: P, historical: B256, @@ -612,11 +595,6 @@ where engine_kind: EngineApiKind, /// The type responsible for processing new payloads payload_processor: PayloadProcessor, - - /// The most recent cache used for execution. - most_recent_cache: Option, - /// Thread pool used for the state root task and prewarming - thread_pool: Arc, } impl std::fmt::Debug @@ -683,16 +661,6 @@ where ) -> Self { let (incoming_tx, incoming) = std::sync::mpsc::channel(); - let num_threads = root::rayon_thread_pool_size(); - - let thread_pool = Arc::new( - rayon::ThreadPoolBuilder::new() - .num_threads(num_threads) - .thread_name(|i| format!("srt-worker-{}", i)) - .build() - .expect("Failed to create proof worker thread pool"), - ); - let payload_processor = PayloadProcessor::new(WorkloadExecutor::new(), evm_config.clone(), &config); @@ -716,8 +684,6 @@ where invalid_block_hook: Box::new(NoopInvalidBlockHook), engine_kind, payload_processor, - most_recent_cache: None, - thread_pool, } } @@ -1797,37 +1763,6 @@ where Ok(block) } - /// Returns the state provider for the requested block hash. - /// - /// This merges the state of all blocks that are part of the chain that the requested block is - /// the head of and are not yet persisted on disk. This includes all blocks that connect back to - /// a canonical block on disk. - /// - /// Returns `None` if the state for the requested hash is not found, this happens if the - /// requested state belongs to a block that is not connected to the canonical chain. - /// - /// Returns an error if we failed to fetch the state from the database. - fn state_provider(&self, hash: B256) -> ProviderResult> { - if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) { - debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory"); - // the block leads back to the canonical chain - let historical = self.provider.state_by_block_hash(historical)?; - return Ok(Some(Box::new(MemoryOverlayStateProvider::new(historical, blocks)))) - } - - // the hash could belong to an unknown block or a persisted block - if let Some(header) = self.provider.header(&hash)? { - debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database"); - // the block is known and persisted - let historical = self.provider.state_by_block_hash(hash)?; - return Ok(Some(historical)) - } - - debug!(target: "engine::tree", %hash, "no canonical state found for block"); - - Ok(None) - } - /// Return the parent hash of the lowest buffered ancestor for the requested block, if there /// are any buffered ancestors. If there are no buffered ancestors, and the block itself does /// not exist in the buffer, this returns the hash that is passed in. @@ -2340,24 +2275,11 @@ where Ok(None) } - /// This fetches the most recent saved cache, using the hash of the block we are trying to - /// execute on top of. - /// - /// If the hash does not match the saved cache's hash, then the only saved cache doesn't contain - /// state useful for this block's execution, and we return `None`. - /// - /// If there is no cache saved, this returns `None`. - /// - /// This `take`s the cache, to avoid cloning the entire cache. - fn take_latest_cache(&mut self, parent_hash: B256) -> Option { - self.most_recent_cache.take_if(|cache| cache.executed_block_hash() == parent_hash) - } - fn insert_block( &mut self, block: RecoveredBlock, ) -> Result> { - self.insert_block_inner2(block.clone()) + self.insert_block_inner(block.clone()) .map_err(|kind| InsertBlockError::new(block.into_sealed_block(), kind)) } @@ -2380,311 +2302,6 @@ where // validate block consensus rules self.validate_block(&block)?; - trace!(target: "engine::tree", block=?block_num_hash, parent=?block.parent_hash(), "Fetching block state provider"); - let Some(state_provider) = self.state_provider(block.parent_hash())? else { - // we don't have the state required to execute this block, buffering it and find the - // missing parent block - let missing_ancestor = self - .state - .buffer - .lowest_ancestor(&block.parent_hash()) - .map(|block| block.parent_num_hash()) - .unwrap_or_else(|| block.parent_num_hash()); - - self.state.buffer.insert_block(block); - - return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { - head: self.state.tree_state.current_canonical_head, - missing_ancestor, - })) - }; - - // now validate against the parent - let parent_block = self.sealed_header_by_hash(block.parent_hash())?.ok_or_else(|| { - InsertBlockErrorKind::Provider(ProviderError::HeaderNotFound( - block.parent_hash().into(), - )) - })?; - if let Err(e) = - self.consensus.validate_header_against_parent(block.sealed_header(), &parent_block) - { - warn!(target: "engine::tree", ?block, "Failed to validate header {} against parent: {e}", block.hash()); - return Err(e.into()) - } - - // We only run the parallel state root if we are currently persisting blocks that are all - // ancestors of the one we are executing. If we're committing ancestor blocks, then: any - // trie updates being committed are a subset of the in-memory trie updates collected before - // fetching reverts. So any diff in reverts (pre vs post commit) is already covered by the - // in-memory trie updates we collect in `compute_state_root_parallel`. - // - // See https://github.com/paradigmxyz/reth/issues/12688 for more details - let is_descendant_of_persisting_blocks = - self.is_descendant_of_persisting_blocks(block.header()); - - // Atomic bool for letting the prewarm tasks know when to stop - let cancel_execution = ManualCancel::default(); - - let (state_root_handle, state_root_task_config, state_root_sender, state_hook) = - if is_descendant_of_persisting_blocks && self.config.use_state_root_task() { - let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?; - - // Compute trie input - let trie_input_start = Instant::now(); - let trie_input = self - .compute_trie_input(consistent_view.clone(), block.header().parent_hash()) - .map_err(|e| InsertBlockErrorKind::Other(Box::new(e)))?; - - // Create state root config - let config_start = Instant::now(); - let state_root_config = - StateRootConfig::new_from_input(consistent_view, trie_input); - - let trie_input_elapsed = config_start - trie_input_start; - self.metrics - .block_validation - .trie_input_duration - .set(trie_input_elapsed.as_secs_f64()); - - let config_elapsed = config_start.elapsed(); - self.metrics - .block_validation - .state_root_config_duration - .set(config_elapsed.as_secs_f64()); - - let state_root_task = - StateRootTask::new(state_root_config.clone(), self.thread_pool.clone()); - let state_root_sender = state_root_task.state_root_message_sender(); - let state_hook = Box::new(state_root_task.state_hook()) as Box; - ( - Some(state_root_task.spawn()), - Some(state_root_config), - Some(state_root_sender), - state_hook, - ) - } else { - (None, None, None, Box::new(NoopHook::default()) as Box) - }; - - let (caches, cache_metrics) = if let Some(cache) = - self.take_latest_cache(block.parent_hash()) - { - cache.split() - } else { - ( - ProviderCacheBuilder::default().build_caches(self.config.cross_block_cache_size()), - CachedStateMetrics::zeroed(), - ) - }; - - // Use cached state provider before executing, used in execution after prewarming threads - // complete - let state_provider = CachedStateProvider::new_with_caches( - state_provider, - caches.clone(), - cache_metrics.clone(), - ); - - // This prevents caches from being saved without all prewarm execution tasks being completed - let prewarm_task_lock = Arc::new(RwLock::new(())); - - if self.config.use_caching_and_prewarming() { - debug!(target: "engine::tree", "Spawning prewarm threads"); - let prewarm_start = Instant::now(); - let prewarm_metrics = self.metrics.prewarm.clone(); - - // Prewarm transactions - for (tx_idx, tx) in block.transactions_recovered().enumerate() { - let state_root_sender = state_root_sender.clone(); - - let start = Instant::now(); - self.prewarm_transaction( - block.header().clone(), - tx.cloned(), - caches.clone(), - cache_metrics.clone(), - state_root_sender, - cancel_execution.clone(), - prewarm_task_lock.clone(), - prewarm_metrics.clone(), - )?; - let elapsed = start.elapsed(); - debug!(target: "engine::tree", ?tx_idx, elapsed = ?elapsed, "Spawned transaction prewarm"); - } - - prewarm_metrics.transactions.set(block.transaction_count() as f64); - prewarm_metrics.transactions_histogram.record(block.transaction_count() as f64); - - drop(state_root_sender); - let elapsed = prewarm_start.elapsed(); - debug!(target: "engine::tree", ?elapsed, "Done spawning prewarm threads"); - - self.metrics.prewarm.spawn_duration.set(elapsed); - self.metrics.prewarm.spawn_duration_histogram.record(elapsed); - } - trace!(target: "engine::tree", block=?block_num_hash, "Executing block"); - - let executor = self.executor_provider.executor(StateProviderDatabase::new(&state_provider)); - let execution_start = Instant::now(); - let output = self.metrics.executor.execute_metered(executor, &block, state_hook)?; - let execution_time = execution_start.elapsed(); - trace!(target: "engine::tree", elapsed = ?execution_time, number=?block_num_hash.number, "Executed block"); - - // Ensure that prewarm tasks don't send proof messages after state root sender is dropped - cancel_execution.cancel(); - - if let Err(err) = self.consensus.validate_block_post_execution(&block, &output) { - // call post-block hook - self.invalid_block_hook.on_invalid_block(&parent_block, &block, &output, None); - return Err(err.into()) - } - - let hashed_state = self.provider.hashed_post_state(&output.state); - - if let Err(err) = self - .payload_validator - .validate_block_post_execution_with_hashed_state(&hashed_state, &block) - { - // call post-block hook - self.invalid_block_hook.on_invalid_block(&parent_block, &block, &output, None); - return Err(err.into()) - } - - trace!(target: "engine::tree", block=?block_num_hash, "Calculating block state root"); - let root_time = Instant::now(); - - // We attempt to compute state root in parallel if we are currently not persisting - // anything to database. This is safe, because the database state cannot - // change until we finish parallel computation. It is important that nothing - // is being persisted as we are computing in parallel, because we initialize - // a different database transaction per thread and it might end up with a - // different view of the database. - let (state_root, trie_output, root_elapsed) = if is_descendant_of_persisting_blocks { - if self.config.use_state_root_task() { - let state_root_handle = state_root_handle - .expect("state root handle must exist if legacy_state_root is false"); - let state_root_config = state_root_task_config.expect("task config is present"); - - // Handle state root result from task using handle - self.handle_state_root_result( - state_root_handle, - state_root_config, - block.sealed_block(), - &hashed_state, - &state_provider, - root_time, - )? - } else { - match self.compute_state_root_parallel(block.header().parent_hash(), &hashed_state) - { - Ok(result) => { - info!( - target: "engine::tree", - block = ?block_num_hash, - regular_state_root = ?result.0, - "Regular root task finished" - ); - (result.0, result.1, root_time.elapsed()) - } - Err(ParallelStateRootError::Provider(ProviderError::ConsistentView(error))) => { - debug!(target: "engine", %error, "Parallel state root computation failed consistency check, falling back"); - let (root, updates) = - state_provider.state_root_with_updates(hashed_state.clone())?; - (root, updates, root_time.elapsed()) - } - Err(error) => return Err(InsertBlockErrorKind::Other(Box::new(error))), - } - } - } else { - debug!(target: "engine::tree", block=?block_num_hash, ?is_descendant_of_persisting_blocks, "Failed to compute state root in parallel"); - let (root, updates) = state_provider.state_root_with_updates(hashed_state.clone())?; - (root, updates, root_time.elapsed()) - }; - - if state_root != block.header().state_root() { - // call post-block hook - self.invalid_block_hook.on_invalid_block( - &parent_block, - &block, - &output, - Some((&trie_output, state_root)), - ); - return Err(ConsensusError::BodyStateRootDiff( - GotExpected { got: state_root, expected: block.header().state_root() }.into(), - ) - .into()) - } - - self.metrics.block_validation.record_state_root(&trie_output, root_elapsed.as_secs_f64()); - debug!(target: "engine::tree", ?root_elapsed, block=?block_num_hash, "Calculated state root"); - - if self.config.use_caching_and_prewarming() { - let save_cache_start = Instant::now(); - // this is the only place / thread a writer is acquired, so we would have already - // crashed if we had a poisoned rwlock - // - // we use a lock here and in prewarming, so we do not save the cache if a prewarm task - // is still running, since it would update the cache with stale data. It's unlikely that - // prewarm tasks are still running at this point however - drop(prewarm_task_lock.write().unwrap()); - // apply state updates to cache and save it (if saving was successful) - self.most_recent_cache = state_provider.save_cache(block.hash(), &output.state).ok(); - let elapsed = save_cache_start.elapsed(); - - // record how long it took to save caches - self.metrics.block_validation.cache_saving_duration.set(elapsed.as_secs_f64()); - } - - let executed: ExecutedBlockWithTrieUpdates = ExecutedBlockWithTrieUpdates { - block: ExecutedBlock { - recovered_block: Arc::new(block), - execution_output: Arc::new(ExecutionOutcome::from((output, block_num_hash.number))), - hashed_state: Arc::new(hashed_state), - }, - trie: Arc::new(trie_output), - }; - - // if the parent is the canonical head, we can insert the block as the pending block - if self.state.tree_state.canonical_block_hash() == executed.recovered_block().parent_hash() - { - debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block"); - self.canonical_in_memory_state.set_pending_block(executed.clone()); - } - - self.state.tree_state.insert_executed(executed.clone()); - self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64); - - // emit insert event - let elapsed = start.elapsed(); - let engine_event = if self.is_fork(block_num_hash.hash)? { - BeaconConsensusEngineEvent::ForkBlockAdded(executed, elapsed) - } else { - BeaconConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed) - }; - self.emit_event(EngineApiEvent::BeaconConsensus(engine_event)); - - debug!(target: "engine::tree", block=?block_num_hash, "Finished inserting block"); - Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) - } - - fn insert_block_inner2( - &mut self, - block: RecoveredBlock, - ) -> Result { - let block_num_hash = block.num_hash(); - debug!(target: "engine::tree", block=?block_num_hash, parent = ?block.parent_hash(), state_root = ?block.state_root(), "Inserting new block into tree"); - - if self.block_by_hash(block.hash())?.is_some() { - return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid)) - } - - let start = Instant::now(); - - trace!(target: "engine::tree", block=?block_num_hash, "Validating block consensus"); - - // validate block consensus rules - self.validate_block(&block)?; - trace!(target: "engine::tree", block=?block_num_hash, parent=?block.parent_hash(), "Fetching block state provider"); let Some(provider_builder) = self.state_provider_builder(block.parent_hash())? else { // we don't have the state required to execute this block, buffering it and find the @@ -2950,144 +2567,6 @@ where Ok(input) } - /// Runs execution for a single transaction, spawning it in the prewarm threadpool. - #[allow(clippy::too_many_arguments)] - fn prewarm_transaction( - &self, - block: N::BlockHeader, - tx: Recovered, - caches: ProviderCaches, - cache_metrics: CachedStateMetrics, - state_root_sender: Option>, - cancel_execution: ManualCancel, - task_finished: Arc>, - metrics: PrewarmThreadMetrics, - ) -> Result<(), InsertBlockErrorKind> { - // Get the builder once, outside the thread - let Some(state_provider_builder) = self.state_provider_builder(block.parent_hash())? else { - trace!( - target: "engine::tree", - parent=%block.parent_hash(), - "Could not get state provider builder for prewarm", - ); - return Ok(()) - }; - - // clone and copy info required for execution - let evm_config = self.evm_config.clone(); - - // spawn task executing the individual tx - self.thread_pool.spawn(move || { - let thread_start = Instant::now(); - let in_progress = task_finished.read().unwrap(); - - // Create the state provider inside the thread - let state_provider = match state_provider_builder.build() { - Ok(provider) => provider, - Err(err) => { - trace!( - target: "engine::tree", - %err, - "Failed to build state provider in prewarm thread" - ); - return - } - }; - - // Use the caches to create a new provider with caching - let state_provider = - CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics); - - let state_provider = StateProviderDatabase::new(&state_provider); - - let mut evm_env = evm_config.evm_env(&block); - - evm_env.cfg_env.disable_nonce_check = true; - - // create a new executor and disable nonce checks in the env - let mut evm = evm_config.evm_with_env(state_provider, evm_env); - - // create the tx env and reset nonce - let tx_env = evm_config.tx_env(&tx); - - // exit early if execution is done - if cancel_execution.is_cancelled() { - return - } - - let execution_start = Instant::now(); - let res = match evm.transact(tx_env) { - Ok(res) => res, - Err(err) => { - trace!( - target: "engine::tree", - %err, - tx_hash=%tx.tx_hash(), - sender=%tx.signer(), - "Error when executing prewarm transaction", - ); - return - } - }; - metrics.execution_duration.record(execution_start.elapsed()); - - // execution no longer in progress, so we can drop the lock - drop(in_progress); - - // if execution is finished there is no point to sending proof targets - if cancel_execution.is_cancelled() { - return - } - - let Some(state_root_sender) = state_root_sender else { return }; - - let mut targets = MultiProofTargets::with_capacity(res.state.len()); - let mut storage_targets = 0; - for (addr, account) in res.state { - // if the account was not touched, or if the account was selfdestructed, do not - // fetch proofs for it - // - // Since selfdestruct can only happen in the same transaction, we can skip - // prefetching proofs for selfdestructed accounts - // - // See: https://eips.ethereum.org/EIPS/eip-6780 - if !account.is_touched() || account.is_selfdestructed() { - continue - } - - let mut storage_set = - B256Set::with_capacity_and_hasher(account.storage.len(), Default::default()); - for (key, slot) in account.storage { - // do nothing if unchanged - if !slot.is_changed() { - continue - } - - storage_set.insert(keccak256(B256::new(key.to_be_bytes()))); - } - - storage_targets += storage_set.len(); - targets.insert(keccak256(addr), storage_set); - } - - debug!( - target: "engine::tree", - tx_hash = ?tx.tx_hash(), - targets = targets.len(), - storage_targets, - "Prefetching proofs for a transaction" - ); - metrics.prefetch_storage_targets.record(storage_targets as f64); - - let _ = state_root_sender.send(StateRootMessage::PrefetchProofs(targets)); - - // record final metrics - metrics.total_runtime.record(thread_start.elapsed()); - }); - - Ok(()) - } - /// Handles an error that occurred while inserting a block. /// /// If this is a validation error this will mark the block as invalid. @@ -3126,75 +2605,6 @@ where )) } - /// Waits for the result on the input [`StateRootHandle`], and handles it, falling back to - /// the hash builder-based state root calculation if it fails. - fn handle_state_root_result( - &self, - state_root_handle: StateRootHandle, - state_root_task_config: StateRootConfig

, - sealed_block: &SealedBlock, - hashed_state: &HashedPostState, - state_provider: impl StateRootProvider, - root_time: Instant, - ) -> Result<(B256, TrieUpdates, Duration), InsertBlockErrorKind> { - match state_root_handle.wait_for_result() { - Ok(StateRootComputeOutcome { - state_root: (task_state_root, task_trie_updates), - time_from_last_update, - .. - }) => { - info!( - target: "engine::tree", - block = ?sealed_block.num_hash(), - ?task_state_root, - task_elapsed = ?time_from_last_update, - "State root task finished" - ); - - if task_state_root != sealed_block.header().state_root() || - self.config.always_compare_trie_updates() - { - if task_state_root != sealed_block.header().state_root() { - debug!(target: "engine::tree", "Task state root does not match block state root"); - } - - let (regular_root, regular_updates) = - state_provider.state_root_with_updates(hashed_state.clone())?; - - if regular_root == sealed_block.header().state_root() { - let provider_ro = state_root_task_config.consistent_view.provider_ro()?; - let in_memory_trie_cursor = InMemoryTrieCursorFactory::new( - DatabaseTrieCursorFactory::new(provider_ro.tx_ref()), - &state_root_task_config.nodes_sorted, - ); - compare_trie_updates( - in_memory_trie_cursor, - task_trie_updates.clone(), - regular_updates.clone(), - ) - .map_err(ProviderError::from)?; - if task_state_root != sealed_block.header().state_root() { - return Ok((regular_root, regular_updates, time_from_last_update)); - } - } else { - debug!(target: "engine::tree", - "Regular state root does not match block state root" - ); - } - } - - Ok((task_state_root, task_trie_updates, time_from_last_update)) - } - Err(error) => { - info!(target: "engine::tree", ?error, "Failed to wait for state root task result"); - // Fall back to sequential calculation - let (root, updates) = - state_provider.state_root_with_updates(hashed_state.clone())?; - Ok((root, updates, root_time.elapsed())) - } - } - } - /// Attempts to find the header for the given block hash if it is canonical. pub fn find_canonical_header( &self,