mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-26 07:38:59 -05:00
chore(tree): remove old insert_block_inner method (#14754)
This commit is contained in:
@@ -15,8 +15,8 @@ use reth_trie::{
|
||||
MultiProofTargets, StorageMultiProof, StorageProof, TrieInput,
|
||||
};
|
||||
use revm_primitives::map::DefaultHashBuilder;
|
||||
use std::time::{Duration, Instant};
|
||||
use tracing::{debug, trace};
|
||||
use std::time::Duration;
|
||||
use tracing::trace;
|
||||
|
||||
pub(crate) type Cache<K, V> =
|
||||
mini_moka::sync::Cache<K, V, alloy_primitives::map::DefaultHashBuilder>;
|
||||
@@ -48,35 +48,6 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> CachedStateProvider<S> {
|
||||
/// Creates a new [`SavedCache`] from the given state updates and executed block hash.
|
||||
///
|
||||
/// This does not update the code cache, because no changes are required to the code cache on
|
||||
/// state change.
|
||||
///
|
||||
/// NOTE: Consumers should ensure that these caches are not in use by a state provider for a
|
||||
/// previous block - otherwise, this update will cause that state provider to contain future
|
||||
/// state, which would be incorrect.
|
||||
pub(crate) fn save_cache(
|
||||
self,
|
||||
executed_block_hash: B256,
|
||||
state_updates: &BundleState,
|
||||
) -> Result<SavedCache, ()> {
|
||||
let Self { caches, metrics, state_provider: _ } = self;
|
||||
let start = Instant::now();
|
||||
|
||||
caches.insert_state(state_updates)?;
|
||||
|
||||
// create a saved cache with the executed block hash, same metrics, and updated caches
|
||||
let saved_cache = SavedCache { hash: executed_block_hash, caches, metrics };
|
||||
saved_cache.update_metrics();
|
||||
|
||||
debug!(target: "engine::caching", update_latency=?start.elapsed(), "Updated state caches");
|
||||
|
||||
Ok(saved_cache)
|
||||
}
|
||||
}
|
||||
|
||||
/// Metrics for the cached state provider, showing hits / misses for each cache
|
||||
#[derive(Metrics, Clone)]
|
||||
#[metrics(scope = "sync.caching")]
|
||||
|
||||
@@ -16,8 +16,6 @@ pub(crate) struct EngineApiMetrics {
|
||||
pub(crate) block_validation: BlockValidationMetrics,
|
||||
/// A copy of legacy blockchain tree metrics, to be replaced when we replace the old tree
|
||||
pub(crate) tree: TreeMetrics,
|
||||
/// Metrics for transaction prewarming threads
|
||||
pub(crate) prewarm: PrewarmThreadMetrics,
|
||||
}
|
||||
|
||||
/// Metrics for the entire blockchain tree
|
||||
@@ -71,10 +69,6 @@ pub(crate) struct BlockValidationMetrics {
|
||||
pub(crate) state_root_duration: Gauge,
|
||||
/// Trie input computation duration
|
||||
pub(crate) trie_input_duration: Gauge,
|
||||
/// Cache saving duration
|
||||
pub(crate) cache_saving_duration: Gauge,
|
||||
/// State root config creation duration
|
||||
pub(crate) state_root_config_duration: Gauge,
|
||||
}
|
||||
|
||||
impl BlockValidationMetrics {
|
||||
@@ -87,26 +81,6 @@ impl BlockValidationMetrics {
|
||||
}
|
||||
}
|
||||
|
||||
/// Metrics for prewarming threads
|
||||
#[derive(Metrics, Clone)]
|
||||
#[metrics(scope = "sync.prewarm")]
|
||||
pub(crate) struct PrewarmThreadMetrics {
|
||||
/// Prewarm thread spawn duration
|
||||
pub(crate) spawn_duration: Gauge,
|
||||
/// A histogram of the prewarm thread spawn duration
|
||||
pub(crate) spawn_duration_histogram: Histogram,
|
||||
/// The number of transactions in the block
|
||||
pub(crate) transactions: Gauge,
|
||||
/// A histogram of the number of transactions in the block
|
||||
pub(crate) transactions_histogram: Histogram,
|
||||
/// A histogram of total runtime durations for prewarm threads
|
||||
pub(crate) total_runtime: Histogram,
|
||||
/// A histogram of execution durations for prewarm threads
|
||||
pub(crate) execution_duration: Histogram,
|
||||
/// A histogram for total prefetch targets in prewarm threads
|
||||
pub(crate) prefetch_storage_targets: Histogram,
|
||||
}
|
||||
|
||||
/// Metrics for the blockchain tree block buffer
|
||||
#[derive(Metrics)]
|
||||
#[metrics(scope = "blockchain_tree.block_buffer")]
|
||||
|
||||
@@ -3,24 +3,18 @@ use crate::{
|
||||
chain::FromOrchestrator,
|
||||
engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
|
||||
persistence::PersistenceHandle,
|
||||
tree::{
|
||||
cached_state::{CachedStateMetrics, CachedStateProvider, ProviderCacheBuilder},
|
||||
metrics::EngineApiMetrics,
|
||||
},
|
||||
tree::{cached_state::CachedStateProvider, metrics::EngineApiMetrics},
|
||||
};
|
||||
use alloy_consensus::{transaction::Recovered, BlockHeader};
|
||||
use alloy_consensus::BlockHeader;
|
||||
use alloy_eips::BlockNumHash;
|
||||
use alloy_primitives::{
|
||||
keccak256,
|
||||
map::{B256Set, HashMap, HashSet},
|
||||
map::{HashMap, HashSet},
|
||||
BlockNumber, B256, U256,
|
||||
};
|
||||
use alloy_rpc_types_engine::{
|
||||
ForkchoiceState, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
|
||||
};
|
||||
use cached_state::{ProviderCaches, SavedCache};
|
||||
use error::{InsertBlockError, InsertBlockErrorKind, InsertBlockFatalError};
|
||||
use metrics::PrewarmThreadMetrics;
|
||||
use persistence_state::CurrentPersistenceAction;
|
||||
use reth_chain_state::{
|
||||
CanonicalInMemoryState, ExecutedBlock, ExecutedBlockWithTrieUpdates,
|
||||
@@ -34,33 +28,21 @@ use reth_engine_primitives::{
|
||||
};
|
||||
use reth_errors::{ConsensusError, ProviderResult};
|
||||
use reth_ethereum_primitives::EthPrimitives;
|
||||
use reth_evm::{
|
||||
execute::BlockExecutorProvider,
|
||||
system_calls::{NoopHook, OnStateHook},
|
||||
ConfigureEvm, Evm,
|
||||
};
|
||||
use reth_evm::{execute::BlockExecutorProvider, ConfigureEvm};
|
||||
use reth_payload_builder::PayloadBuilderHandle;
|
||||
use reth_payload_primitives::{EngineApiMessageVersion, PayloadBuilderAttributes};
|
||||
use reth_primitives_traits::{
|
||||
Block, GotExpected, NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader,
|
||||
SignedTransaction,
|
||||
};
|
||||
use reth_provider::{
|
||||
providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory,
|
||||
ExecutionOutcome, HashedPostStateProvider, ProviderError, StateCommitmentProvider,
|
||||
StateProviderBox, StateProviderFactory, StateReader, StateRootProvider, TransactionVariant,
|
||||
providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, ExecutionOutcome,
|
||||
HashedPostStateProvider, ProviderError, StateCommitmentProvider, StateProviderBox,
|
||||
StateProviderFactory, StateReader, StateRootProvider, TransactionVariant,
|
||||
};
|
||||
use reth_revm::{cancelled::ManualCancel, database::StateProviderDatabase};
|
||||
use reth_revm::database::StateProviderDatabase;
|
||||
use reth_stages_api::ControlFlow;
|
||||
use reth_trie::{
|
||||
trie_cursor::InMemoryTrieCursorFactory, updates::TrieUpdates, HashedPostState,
|
||||
MultiProofTargets, TrieInput,
|
||||
};
|
||||
use reth_trie_db::DatabaseTrieCursorFactory;
|
||||
use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput};
|
||||
use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
|
||||
use root::{
|
||||
StateRootComputeOutcome, StateRootConfig, StateRootHandle, StateRootMessage, StateRootTask,
|
||||
};
|
||||
use std::{
|
||||
cmp::Ordering,
|
||||
collections::{btree_map, hash_map, BTreeMap, VecDeque},
|
||||
@@ -68,9 +50,9 @@ use std::{
|
||||
ops::Bound,
|
||||
sync::{
|
||||
mpsc::{Receiver, RecvError, RecvTimeoutError, Sender},
|
||||
Arc, RwLock,
|
||||
Arc,
|
||||
},
|
||||
time::{Duration, Instant},
|
||||
time::Instant,
|
||||
};
|
||||
use tokio::sync::{
|
||||
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
|
||||
@@ -88,6 +70,8 @@ mod metrics;
|
||||
mod payload_processor;
|
||||
mod persistence_state;
|
||||
pub mod root;
|
||||
// TODO(alexey): compare trie updates in `insert_block_inner`
|
||||
#[allow(unused)]
|
||||
mod trie_updates;
|
||||
|
||||
use crate::tree::{
|
||||
@@ -100,7 +84,6 @@ pub use config::TreeConfig;
|
||||
pub use invalid_block_hook::{InvalidBlockHooks, NoopInvalidBlockHook};
|
||||
pub use invalid_headers::InvalidHeaderCache;
|
||||
pub use persistence_state::PersistenceState;
|
||||
use trie_updates::compare_trie_updates;
|
||||
|
||||
/// Keeps track of the state of the tree.
|
||||
///
|
||||
@@ -456,7 +439,7 @@ pub struct StateProviderBuilder<N: NodePrimitives, P> {
|
||||
|
||||
impl<N: NodePrimitives, P> StateProviderBuilder<N, P> {
|
||||
/// Creates a new state provider from the provider factory, historical block hash and optional
|
||||
/// overlayed blocks.
|
||||
/// overlaid blocks.
|
||||
fn new(
|
||||
provider_factory: P,
|
||||
historical: B256,
|
||||
@@ -612,11 +595,6 @@ where
|
||||
engine_kind: EngineApiKind,
|
||||
/// The type responsible for processing new payloads
|
||||
payload_processor: PayloadProcessor<N, C>,
|
||||
|
||||
/// The most recent cache used for execution.
|
||||
most_recent_cache: Option<SavedCache>,
|
||||
/// Thread pool used for the state root task and prewarming
|
||||
thread_pool: Arc<rayon::ThreadPool>,
|
||||
}
|
||||
|
||||
impl<N, P: Debug, E: Debug, T: EngineTypes + Debug, V: Debug, C: Debug> std::fmt::Debug
|
||||
@@ -683,16 +661,6 @@ where
|
||||
) -> Self {
|
||||
let (incoming_tx, incoming) = std::sync::mpsc::channel();
|
||||
|
||||
let num_threads = root::rayon_thread_pool_size();
|
||||
|
||||
let thread_pool = Arc::new(
|
||||
rayon::ThreadPoolBuilder::new()
|
||||
.num_threads(num_threads)
|
||||
.thread_name(|i| format!("srt-worker-{}", i))
|
||||
.build()
|
||||
.expect("Failed to create proof worker thread pool"),
|
||||
);
|
||||
|
||||
let payload_processor =
|
||||
PayloadProcessor::new(WorkloadExecutor::new(), evm_config.clone(), &config);
|
||||
|
||||
@@ -716,8 +684,6 @@ where
|
||||
invalid_block_hook: Box::new(NoopInvalidBlockHook),
|
||||
engine_kind,
|
||||
payload_processor,
|
||||
most_recent_cache: None,
|
||||
thread_pool,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1797,37 +1763,6 @@ where
|
||||
Ok(block)
|
||||
}
|
||||
|
||||
/// Returns the state provider for the requested block hash.
|
||||
///
|
||||
/// This merges the state of all blocks that are part of the chain that the requested block is
|
||||
/// the head of and are not yet persisted on disk. This includes all blocks that connect back to
|
||||
/// a canonical block on disk.
|
||||
///
|
||||
/// Returns `None` if the state for the requested hash is not found, this happens if the
|
||||
/// requested state belongs to a block that is not connected to the canonical chain.
|
||||
///
|
||||
/// Returns an error if we failed to fetch the state from the database.
|
||||
fn state_provider(&self, hash: B256) -> ProviderResult<Option<StateProviderBox>> {
|
||||
if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
|
||||
debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory");
|
||||
// the block leads back to the canonical chain
|
||||
let historical = self.provider.state_by_block_hash(historical)?;
|
||||
return Ok(Some(Box::new(MemoryOverlayStateProvider::new(historical, blocks))))
|
||||
}
|
||||
|
||||
// the hash could belong to an unknown block or a persisted block
|
||||
if let Some(header) = self.provider.header(&hash)? {
|
||||
debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database");
|
||||
// the block is known and persisted
|
||||
let historical = self.provider.state_by_block_hash(hash)?;
|
||||
return Ok(Some(historical))
|
||||
}
|
||||
|
||||
debug!(target: "engine::tree", %hash, "no canonical state found for block");
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Return the parent hash of the lowest buffered ancestor for the requested block, if there
|
||||
/// are any buffered ancestors. If there are no buffered ancestors, and the block itself does
|
||||
/// not exist in the buffer, this returns the hash that is passed in.
|
||||
@@ -2340,24 +2275,11 @@ where
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// This fetches the most recent saved cache, using the hash of the block we are trying to
|
||||
/// execute on top of.
|
||||
///
|
||||
/// If the hash does not match the saved cache's hash, then the only saved cache doesn't contain
|
||||
/// state useful for this block's execution, and we return `None`.
|
||||
///
|
||||
/// If there is no cache saved, this returns `None`.
|
||||
///
|
||||
/// This `take`s the cache, to avoid cloning the entire cache.
|
||||
fn take_latest_cache(&mut self, parent_hash: B256) -> Option<SavedCache> {
|
||||
self.most_recent_cache.take_if(|cache| cache.executed_block_hash() == parent_hash)
|
||||
}
|
||||
|
||||
fn insert_block(
|
||||
&mut self,
|
||||
block: RecoveredBlock<N::Block>,
|
||||
) -> Result<InsertPayloadOk, InsertBlockError<N::Block>> {
|
||||
self.insert_block_inner2(block.clone())
|
||||
self.insert_block_inner(block.clone())
|
||||
.map_err(|kind| InsertBlockError::new(block.into_sealed_block(), kind))
|
||||
}
|
||||
|
||||
@@ -2380,311 +2302,6 @@ where
|
||||
// validate block consensus rules
|
||||
self.validate_block(&block)?;
|
||||
|
||||
trace!(target: "engine::tree", block=?block_num_hash, parent=?block.parent_hash(), "Fetching block state provider");
|
||||
let Some(state_provider) = self.state_provider(block.parent_hash())? else {
|
||||
// we don't have the state required to execute this block, buffering it and find the
|
||||
// missing parent block
|
||||
let missing_ancestor = self
|
||||
.state
|
||||
.buffer
|
||||
.lowest_ancestor(&block.parent_hash())
|
||||
.map(|block| block.parent_num_hash())
|
||||
.unwrap_or_else(|| block.parent_num_hash());
|
||||
|
||||
self.state.buffer.insert_block(block);
|
||||
|
||||
return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected {
|
||||
head: self.state.tree_state.current_canonical_head,
|
||||
missing_ancestor,
|
||||
}))
|
||||
};
|
||||
|
||||
// now validate against the parent
|
||||
let parent_block = self.sealed_header_by_hash(block.parent_hash())?.ok_or_else(|| {
|
||||
InsertBlockErrorKind::Provider(ProviderError::HeaderNotFound(
|
||||
block.parent_hash().into(),
|
||||
))
|
||||
})?;
|
||||
if let Err(e) =
|
||||
self.consensus.validate_header_against_parent(block.sealed_header(), &parent_block)
|
||||
{
|
||||
warn!(target: "engine::tree", ?block, "Failed to validate header {} against parent: {e}", block.hash());
|
||||
return Err(e.into())
|
||||
}
|
||||
|
||||
// We only run the parallel state root if we are currently persisting blocks that are all
|
||||
// ancestors of the one we are executing. If we're committing ancestor blocks, then: any
|
||||
// trie updates being committed are a subset of the in-memory trie updates collected before
|
||||
// fetching reverts. So any diff in reverts (pre vs post commit) is already covered by the
|
||||
// in-memory trie updates we collect in `compute_state_root_parallel`.
|
||||
//
|
||||
// See https://github.com/paradigmxyz/reth/issues/12688 for more details
|
||||
let is_descendant_of_persisting_blocks =
|
||||
self.is_descendant_of_persisting_blocks(block.header());
|
||||
|
||||
// Atomic bool for letting the prewarm tasks know when to stop
|
||||
let cancel_execution = ManualCancel::default();
|
||||
|
||||
let (state_root_handle, state_root_task_config, state_root_sender, state_hook) =
|
||||
if is_descendant_of_persisting_blocks && self.config.use_state_root_task() {
|
||||
let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
|
||||
|
||||
// Compute trie input
|
||||
let trie_input_start = Instant::now();
|
||||
let trie_input = self
|
||||
.compute_trie_input(consistent_view.clone(), block.header().parent_hash())
|
||||
.map_err(|e| InsertBlockErrorKind::Other(Box::new(e)))?;
|
||||
|
||||
// Create state root config
|
||||
let config_start = Instant::now();
|
||||
let state_root_config =
|
||||
StateRootConfig::new_from_input(consistent_view, trie_input);
|
||||
|
||||
let trie_input_elapsed = config_start - trie_input_start;
|
||||
self.metrics
|
||||
.block_validation
|
||||
.trie_input_duration
|
||||
.set(trie_input_elapsed.as_secs_f64());
|
||||
|
||||
let config_elapsed = config_start.elapsed();
|
||||
self.metrics
|
||||
.block_validation
|
||||
.state_root_config_duration
|
||||
.set(config_elapsed.as_secs_f64());
|
||||
|
||||
let state_root_task =
|
||||
StateRootTask::new(state_root_config.clone(), self.thread_pool.clone());
|
||||
let state_root_sender = state_root_task.state_root_message_sender();
|
||||
let state_hook = Box::new(state_root_task.state_hook()) as Box<dyn OnStateHook>;
|
||||
(
|
||||
Some(state_root_task.spawn()),
|
||||
Some(state_root_config),
|
||||
Some(state_root_sender),
|
||||
state_hook,
|
||||
)
|
||||
} else {
|
||||
(None, None, None, Box::new(NoopHook::default()) as Box<dyn OnStateHook>)
|
||||
};
|
||||
|
||||
let (caches, cache_metrics) = if let Some(cache) =
|
||||
self.take_latest_cache(block.parent_hash())
|
||||
{
|
||||
cache.split()
|
||||
} else {
|
||||
(
|
||||
ProviderCacheBuilder::default().build_caches(self.config.cross_block_cache_size()),
|
||||
CachedStateMetrics::zeroed(),
|
||||
)
|
||||
};
|
||||
|
||||
// Use cached state provider before executing, used in execution after prewarming threads
|
||||
// complete
|
||||
let state_provider = CachedStateProvider::new_with_caches(
|
||||
state_provider,
|
||||
caches.clone(),
|
||||
cache_metrics.clone(),
|
||||
);
|
||||
|
||||
// This prevents caches from being saved without all prewarm execution tasks being completed
|
||||
let prewarm_task_lock = Arc::new(RwLock::new(()));
|
||||
|
||||
if self.config.use_caching_and_prewarming() {
|
||||
debug!(target: "engine::tree", "Spawning prewarm threads");
|
||||
let prewarm_start = Instant::now();
|
||||
let prewarm_metrics = self.metrics.prewarm.clone();
|
||||
|
||||
// Prewarm transactions
|
||||
for (tx_idx, tx) in block.transactions_recovered().enumerate() {
|
||||
let state_root_sender = state_root_sender.clone();
|
||||
|
||||
let start = Instant::now();
|
||||
self.prewarm_transaction(
|
||||
block.header().clone(),
|
||||
tx.cloned(),
|
||||
caches.clone(),
|
||||
cache_metrics.clone(),
|
||||
state_root_sender,
|
||||
cancel_execution.clone(),
|
||||
prewarm_task_lock.clone(),
|
||||
prewarm_metrics.clone(),
|
||||
)?;
|
||||
let elapsed = start.elapsed();
|
||||
debug!(target: "engine::tree", ?tx_idx, elapsed = ?elapsed, "Spawned transaction prewarm");
|
||||
}
|
||||
|
||||
prewarm_metrics.transactions.set(block.transaction_count() as f64);
|
||||
prewarm_metrics.transactions_histogram.record(block.transaction_count() as f64);
|
||||
|
||||
drop(state_root_sender);
|
||||
let elapsed = prewarm_start.elapsed();
|
||||
debug!(target: "engine::tree", ?elapsed, "Done spawning prewarm threads");
|
||||
|
||||
self.metrics.prewarm.spawn_duration.set(elapsed);
|
||||
self.metrics.prewarm.spawn_duration_histogram.record(elapsed);
|
||||
}
|
||||
trace!(target: "engine::tree", block=?block_num_hash, "Executing block");
|
||||
|
||||
let executor = self.executor_provider.executor(StateProviderDatabase::new(&state_provider));
|
||||
let execution_start = Instant::now();
|
||||
let output = self.metrics.executor.execute_metered(executor, &block, state_hook)?;
|
||||
let execution_time = execution_start.elapsed();
|
||||
trace!(target: "engine::tree", elapsed = ?execution_time, number=?block_num_hash.number, "Executed block");
|
||||
|
||||
// Ensure that prewarm tasks don't send proof messages after state root sender is dropped
|
||||
cancel_execution.cancel();
|
||||
|
||||
if let Err(err) = self.consensus.validate_block_post_execution(&block, &output) {
|
||||
// call post-block hook
|
||||
self.invalid_block_hook.on_invalid_block(&parent_block, &block, &output, None);
|
||||
return Err(err.into())
|
||||
}
|
||||
|
||||
let hashed_state = self.provider.hashed_post_state(&output.state);
|
||||
|
||||
if let Err(err) = self
|
||||
.payload_validator
|
||||
.validate_block_post_execution_with_hashed_state(&hashed_state, &block)
|
||||
{
|
||||
// call post-block hook
|
||||
self.invalid_block_hook.on_invalid_block(&parent_block, &block, &output, None);
|
||||
return Err(err.into())
|
||||
}
|
||||
|
||||
trace!(target: "engine::tree", block=?block_num_hash, "Calculating block state root");
|
||||
let root_time = Instant::now();
|
||||
|
||||
// We attempt to compute state root in parallel if we are currently not persisting
|
||||
// anything to database. This is safe, because the database state cannot
|
||||
// change until we finish parallel computation. It is important that nothing
|
||||
// is being persisted as we are computing in parallel, because we initialize
|
||||
// a different database transaction per thread and it might end up with a
|
||||
// different view of the database.
|
||||
let (state_root, trie_output, root_elapsed) = if is_descendant_of_persisting_blocks {
|
||||
if self.config.use_state_root_task() {
|
||||
let state_root_handle = state_root_handle
|
||||
.expect("state root handle must exist if legacy_state_root is false");
|
||||
let state_root_config = state_root_task_config.expect("task config is present");
|
||||
|
||||
// Handle state root result from task using handle
|
||||
self.handle_state_root_result(
|
||||
state_root_handle,
|
||||
state_root_config,
|
||||
block.sealed_block(),
|
||||
&hashed_state,
|
||||
&state_provider,
|
||||
root_time,
|
||||
)?
|
||||
} else {
|
||||
match self.compute_state_root_parallel(block.header().parent_hash(), &hashed_state)
|
||||
{
|
||||
Ok(result) => {
|
||||
info!(
|
||||
target: "engine::tree",
|
||||
block = ?block_num_hash,
|
||||
regular_state_root = ?result.0,
|
||||
"Regular root task finished"
|
||||
);
|
||||
(result.0, result.1, root_time.elapsed())
|
||||
}
|
||||
Err(ParallelStateRootError::Provider(ProviderError::ConsistentView(error))) => {
|
||||
debug!(target: "engine", %error, "Parallel state root computation failed consistency check, falling back");
|
||||
let (root, updates) =
|
||||
state_provider.state_root_with_updates(hashed_state.clone())?;
|
||||
(root, updates, root_time.elapsed())
|
||||
}
|
||||
Err(error) => return Err(InsertBlockErrorKind::Other(Box::new(error))),
|
||||
}
|
||||
}
|
||||
} else {
|
||||
debug!(target: "engine::tree", block=?block_num_hash, ?is_descendant_of_persisting_blocks, "Failed to compute state root in parallel");
|
||||
let (root, updates) = state_provider.state_root_with_updates(hashed_state.clone())?;
|
||||
(root, updates, root_time.elapsed())
|
||||
};
|
||||
|
||||
if state_root != block.header().state_root() {
|
||||
// call post-block hook
|
||||
self.invalid_block_hook.on_invalid_block(
|
||||
&parent_block,
|
||||
&block,
|
||||
&output,
|
||||
Some((&trie_output, state_root)),
|
||||
);
|
||||
return Err(ConsensusError::BodyStateRootDiff(
|
||||
GotExpected { got: state_root, expected: block.header().state_root() }.into(),
|
||||
)
|
||||
.into())
|
||||
}
|
||||
|
||||
self.metrics.block_validation.record_state_root(&trie_output, root_elapsed.as_secs_f64());
|
||||
debug!(target: "engine::tree", ?root_elapsed, block=?block_num_hash, "Calculated state root");
|
||||
|
||||
if self.config.use_caching_and_prewarming() {
|
||||
let save_cache_start = Instant::now();
|
||||
// this is the only place / thread a writer is acquired, so we would have already
|
||||
// crashed if we had a poisoned rwlock
|
||||
//
|
||||
// we use a lock here and in prewarming, so we do not save the cache if a prewarm task
|
||||
// is still running, since it would update the cache with stale data. It's unlikely that
|
||||
// prewarm tasks are still running at this point however
|
||||
drop(prewarm_task_lock.write().unwrap());
|
||||
// apply state updates to cache and save it (if saving was successful)
|
||||
self.most_recent_cache = state_provider.save_cache(block.hash(), &output.state).ok();
|
||||
let elapsed = save_cache_start.elapsed();
|
||||
|
||||
// record how long it took to save caches
|
||||
self.metrics.block_validation.cache_saving_duration.set(elapsed.as_secs_f64());
|
||||
}
|
||||
|
||||
let executed: ExecutedBlockWithTrieUpdates<N> = ExecutedBlockWithTrieUpdates {
|
||||
block: ExecutedBlock {
|
||||
recovered_block: Arc::new(block),
|
||||
execution_output: Arc::new(ExecutionOutcome::from((output, block_num_hash.number))),
|
||||
hashed_state: Arc::new(hashed_state),
|
||||
},
|
||||
trie: Arc::new(trie_output),
|
||||
};
|
||||
|
||||
// if the parent is the canonical head, we can insert the block as the pending block
|
||||
if self.state.tree_state.canonical_block_hash() == executed.recovered_block().parent_hash()
|
||||
{
|
||||
debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
|
||||
self.canonical_in_memory_state.set_pending_block(executed.clone());
|
||||
}
|
||||
|
||||
self.state.tree_state.insert_executed(executed.clone());
|
||||
self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
|
||||
|
||||
// emit insert event
|
||||
let elapsed = start.elapsed();
|
||||
let engine_event = if self.is_fork(block_num_hash.hash)? {
|
||||
BeaconConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
|
||||
} else {
|
||||
BeaconConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
|
||||
};
|
||||
self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
|
||||
|
||||
debug!(target: "engine::tree", block=?block_num_hash, "Finished inserting block");
|
||||
Ok(InsertPayloadOk::Inserted(BlockStatus::Valid))
|
||||
}
|
||||
|
||||
fn insert_block_inner2(
|
||||
&mut self,
|
||||
block: RecoveredBlock<N::Block>,
|
||||
) -> Result<InsertPayloadOk, InsertBlockErrorKind> {
|
||||
let block_num_hash = block.num_hash();
|
||||
debug!(target: "engine::tree", block=?block_num_hash, parent = ?block.parent_hash(), state_root = ?block.state_root(), "Inserting new block into tree");
|
||||
|
||||
if self.block_by_hash(block.hash())?.is_some() {
|
||||
return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid))
|
||||
}
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
trace!(target: "engine::tree", block=?block_num_hash, "Validating block consensus");
|
||||
|
||||
// validate block consensus rules
|
||||
self.validate_block(&block)?;
|
||||
|
||||
trace!(target: "engine::tree", block=?block_num_hash, parent=?block.parent_hash(), "Fetching block state provider");
|
||||
let Some(provider_builder) = self.state_provider_builder(block.parent_hash())? else {
|
||||
// we don't have the state required to execute this block, buffering it and find the
|
||||
@@ -2950,144 +2567,6 @@ where
|
||||
Ok(input)
|
||||
}
|
||||
|
||||
/// Runs execution for a single transaction, spawning it in the prewarm threadpool.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn prewarm_transaction(
|
||||
&self,
|
||||
block: N::BlockHeader,
|
||||
tx: Recovered<N::SignedTx>,
|
||||
caches: ProviderCaches,
|
||||
cache_metrics: CachedStateMetrics,
|
||||
state_root_sender: Option<Sender<StateRootMessage>>,
|
||||
cancel_execution: ManualCancel,
|
||||
task_finished: Arc<RwLock<()>>,
|
||||
metrics: PrewarmThreadMetrics,
|
||||
) -> Result<(), InsertBlockErrorKind> {
|
||||
// Get the builder once, outside the thread
|
||||
let Some(state_provider_builder) = self.state_provider_builder(block.parent_hash())? else {
|
||||
trace!(
|
||||
target: "engine::tree",
|
||||
parent=%block.parent_hash(),
|
||||
"Could not get state provider builder for prewarm",
|
||||
);
|
||||
return Ok(())
|
||||
};
|
||||
|
||||
// clone and copy info required for execution
|
||||
let evm_config = self.evm_config.clone();
|
||||
|
||||
// spawn task executing the individual tx
|
||||
self.thread_pool.spawn(move || {
|
||||
let thread_start = Instant::now();
|
||||
let in_progress = task_finished.read().unwrap();
|
||||
|
||||
// Create the state provider inside the thread
|
||||
let state_provider = match state_provider_builder.build() {
|
||||
Ok(provider) => provider,
|
||||
Err(err) => {
|
||||
trace!(
|
||||
target: "engine::tree",
|
||||
%err,
|
||||
"Failed to build state provider in prewarm thread"
|
||||
);
|
||||
return
|
||||
}
|
||||
};
|
||||
|
||||
// Use the caches to create a new provider with caching
|
||||
let state_provider =
|
||||
CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics);
|
||||
|
||||
let state_provider = StateProviderDatabase::new(&state_provider);
|
||||
|
||||
let mut evm_env = evm_config.evm_env(&block);
|
||||
|
||||
evm_env.cfg_env.disable_nonce_check = true;
|
||||
|
||||
// create a new executor and disable nonce checks in the env
|
||||
let mut evm = evm_config.evm_with_env(state_provider, evm_env);
|
||||
|
||||
// create the tx env and reset nonce
|
||||
let tx_env = evm_config.tx_env(&tx);
|
||||
|
||||
// exit early if execution is done
|
||||
if cancel_execution.is_cancelled() {
|
||||
return
|
||||
}
|
||||
|
||||
let execution_start = Instant::now();
|
||||
let res = match evm.transact(tx_env) {
|
||||
Ok(res) => res,
|
||||
Err(err) => {
|
||||
trace!(
|
||||
target: "engine::tree",
|
||||
%err,
|
||||
tx_hash=%tx.tx_hash(),
|
||||
sender=%tx.signer(),
|
||||
"Error when executing prewarm transaction",
|
||||
);
|
||||
return
|
||||
}
|
||||
};
|
||||
metrics.execution_duration.record(execution_start.elapsed());
|
||||
|
||||
// execution no longer in progress, so we can drop the lock
|
||||
drop(in_progress);
|
||||
|
||||
// if execution is finished there is no point to sending proof targets
|
||||
if cancel_execution.is_cancelled() {
|
||||
return
|
||||
}
|
||||
|
||||
let Some(state_root_sender) = state_root_sender else { return };
|
||||
|
||||
let mut targets = MultiProofTargets::with_capacity(res.state.len());
|
||||
let mut storage_targets = 0;
|
||||
for (addr, account) in res.state {
|
||||
// if the account was not touched, or if the account was selfdestructed, do not
|
||||
// fetch proofs for it
|
||||
//
|
||||
// Since selfdestruct can only happen in the same transaction, we can skip
|
||||
// prefetching proofs for selfdestructed accounts
|
||||
//
|
||||
// See: https://eips.ethereum.org/EIPS/eip-6780
|
||||
if !account.is_touched() || account.is_selfdestructed() {
|
||||
continue
|
||||
}
|
||||
|
||||
let mut storage_set =
|
||||
B256Set::with_capacity_and_hasher(account.storage.len(), Default::default());
|
||||
for (key, slot) in account.storage {
|
||||
// do nothing if unchanged
|
||||
if !slot.is_changed() {
|
||||
continue
|
||||
}
|
||||
|
||||
storage_set.insert(keccak256(B256::new(key.to_be_bytes())));
|
||||
}
|
||||
|
||||
storage_targets += storage_set.len();
|
||||
targets.insert(keccak256(addr), storage_set);
|
||||
}
|
||||
|
||||
debug!(
|
||||
target: "engine::tree",
|
||||
tx_hash = ?tx.tx_hash(),
|
||||
targets = targets.len(),
|
||||
storage_targets,
|
||||
"Prefetching proofs for a transaction"
|
||||
);
|
||||
metrics.prefetch_storage_targets.record(storage_targets as f64);
|
||||
|
||||
let _ = state_root_sender.send(StateRootMessage::PrefetchProofs(targets));
|
||||
|
||||
// record final metrics
|
||||
metrics.total_runtime.record(thread_start.elapsed());
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Handles an error that occurred while inserting a block.
|
||||
///
|
||||
/// If this is a validation error this will mark the block as invalid.
|
||||
@@ -3126,75 +2605,6 @@ where
|
||||
))
|
||||
}
|
||||
|
||||
/// Waits for the result on the input [`StateRootHandle`], and handles it, falling back to
|
||||
/// the hash builder-based state root calculation if it fails.
|
||||
fn handle_state_root_result(
|
||||
&self,
|
||||
state_root_handle: StateRootHandle,
|
||||
state_root_task_config: StateRootConfig<P>,
|
||||
sealed_block: &SealedBlock<N::Block>,
|
||||
hashed_state: &HashedPostState,
|
||||
state_provider: impl StateRootProvider,
|
||||
root_time: Instant,
|
||||
) -> Result<(B256, TrieUpdates, Duration), InsertBlockErrorKind> {
|
||||
match state_root_handle.wait_for_result() {
|
||||
Ok(StateRootComputeOutcome {
|
||||
state_root: (task_state_root, task_trie_updates),
|
||||
time_from_last_update,
|
||||
..
|
||||
}) => {
|
||||
info!(
|
||||
target: "engine::tree",
|
||||
block = ?sealed_block.num_hash(),
|
||||
?task_state_root,
|
||||
task_elapsed = ?time_from_last_update,
|
||||
"State root task finished"
|
||||
);
|
||||
|
||||
if task_state_root != sealed_block.header().state_root() ||
|
||||
self.config.always_compare_trie_updates()
|
||||
{
|
||||
if task_state_root != sealed_block.header().state_root() {
|
||||
debug!(target: "engine::tree", "Task state root does not match block state root");
|
||||
}
|
||||
|
||||
let (regular_root, regular_updates) =
|
||||
state_provider.state_root_with_updates(hashed_state.clone())?;
|
||||
|
||||
if regular_root == sealed_block.header().state_root() {
|
||||
let provider_ro = state_root_task_config.consistent_view.provider_ro()?;
|
||||
let in_memory_trie_cursor = InMemoryTrieCursorFactory::new(
|
||||
DatabaseTrieCursorFactory::new(provider_ro.tx_ref()),
|
||||
&state_root_task_config.nodes_sorted,
|
||||
);
|
||||
compare_trie_updates(
|
||||
in_memory_trie_cursor,
|
||||
task_trie_updates.clone(),
|
||||
regular_updates.clone(),
|
||||
)
|
||||
.map_err(ProviderError::from)?;
|
||||
if task_state_root != sealed_block.header().state_root() {
|
||||
return Ok((regular_root, regular_updates, time_from_last_update));
|
||||
}
|
||||
} else {
|
||||
debug!(target: "engine::tree",
|
||||
"Regular state root does not match block state root"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok((task_state_root, task_trie_updates, time_from_last_update))
|
||||
}
|
||||
Err(error) => {
|
||||
info!(target: "engine::tree", ?error, "Failed to wait for state root task result");
|
||||
// Fall back to sequential calculation
|
||||
let (root, updates) =
|
||||
state_provider.state_root_with_updates(hashed_state.clone())?;
|
||||
Ok((root, updates, root_time.elapsed()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempts to find the header for the given block hash if it is canonical.
|
||||
pub fn find_canonical_header(
|
||||
&self,
|
||||
|
||||
Reference in New Issue
Block a user