refactor: introduce payload processor (#14589)

Co-authored-by: Federico Gimenez <fgimenez@users.noreply.github.com>
This commit is contained in:
Matthias Seitz
2025-02-27 15:12:24 +01:00
committed by GitHub
parent 08aee60143
commit e92a6a3472
10 changed files with 2431 additions and 14 deletions

1
Cargo.lock generated
View File

@@ -7432,6 +7432,7 @@ dependencies = [
"futures",
"metrics",
"mini-moka",
"parking_lot",
"proptest",
"rand 0.8.5",
"rayon",

View File

@@ -46,7 +46,7 @@ revm-primitives.workspace = true
# common
futures.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["macros", "sync"] }
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "sync", "macros"] }
mini-moka = { workspace = true, features = ["sync"] }
# metrics
@@ -58,6 +58,7 @@ schnellru.workspace = true
rayon.workspace = true
tracing.workspace = true
derive_more.workspace = true
parking_lot.workspace = true
# optional deps for test-utils
reth-prune-types = { workspace = true, optional = true }

View File

@@ -67,15 +67,11 @@ impl<S> CachedStateProvider<S> {
caches.insert_state(state_updates)?;
// set metrics
metrics.storage_cache_size.set(caches.total_storage_slots() as f64);
metrics.account_cache_size.set(caches.account_cache.entry_count() as f64);
metrics.code_cache_size.set(caches.code_cache.entry_count() as f64);
debug!(target: "engine::caching", update_latency=?start.elapsed(), "Updated state caches");
// create a saved cache with the executed block hash, same metrics, and updated caches
let saved_cache = SavedCache { hash: executed_block_hash, caches, metrics };
saved_cache.update_metrics();
debug!(target: "engine::caching", update_latency=?start.elapsed(), "Updated state caches");
Ok(saved_cache)
}
@@ -483,7 +479,7 @@ impl Default for ProviderCacheBuilder {
/// A saved cache that has been used for executing a specific block, which has been updated for its
/// execution.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) struct SavedCache {
/// The hash of the block these caches were used to execute.
hash: B256,
@@ -496,6 +492,15 @@ pub(crate) struct SavedCache {
}
impl SavedCache {
/// Creates a new instance with the internals
pub(super) const fn new(
hash: B256,
caches: ProviderCaches,
metrics: CachedStateMetrics,
) -> Self {
Self { hash, caches, metrics }
}
/// Returns the hash for this cache
pub(crate) const fn executed_block_hash(&self) -> B256 {
self.hash
@@ -505,6 +510,18 @@ impl SavedCache {
pub(crate) fn split(self) -> (ProviderCaches, CachedStateMetrics) {
(self.caches, self.metrics)
}
/// Returns the [`ProviderCaches`] belonging to the tracked hash.
pub(crate) fn cache(&self) -> &ProviderCaches {
&self.caches
}
/// Updates the metrics for the [`ProviderCaches`].
pub(crate) fn update_metrics(&self) {
self.metrics.storage_cache_size.set(self.caches.total_storage_slots() as f64);
self.metrics.account_cache_size.set(self.caches.account_cache.entry_count() as f64);
self.metrics.code_cache_size.set(self.caches.code_cache.entry_count() as f64);
}
}
/// Cache for an account's storage slots

View File

@@ -85,11 +85,16 @@ pub mod error;
mod invalid_block_hook;
mod invalid_headers;
mod metrics;
mod payload_processor;
mod persistence_state;
pub mod root;
mod trie_updates;
use crate::tree::{config::MIN_BLOCKS_FOR_PIPELINE_RUN, error::AdvancePersistenceError};
use crate::tree::{
config::MIN_BLOCKS_FOR_PIPELINE_RUN,
error::AdvancePersistenceError,
payload_processor::{executor::WorkloadExecutor, PayloadProcessor},
};
pub use block_buffer::BlockBuffer;
pub use config::TreeConfig;
pub use invalid_block_hook::{InvalidBlockHooks, NoopInvalidBlockHook};
@@ -601,6 +606,9 @@ where
invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
/// The engine API variant of this handler
engine_kind: EngineApiKind,
/// The type responsible for processing new payloads
payload_processor: PayloadProcessor<N, C>,
/// The most recent cache used for execution.
most_recent_cache: Option<SavedCache>,
/// Thread pool used for the state root task and prewarming
@@ -681,6 +689,9 @@ where
.expect("Failed to create proof worker thread pool"),
);
let payload_processor =
PayloadProcessor::new(WorkloadExecutor::new(), evm_config.clone(), &config);
Self {
provider,
executor_provider,
@@ -700,6 +711,7 @@ where
incoming_tx,
invalid_block_hook: Box::new(NoopInvalidBlockHook),
engine_kind,
payload_processor,
most_recent_cache: None,
thread_pool,
}
@@ -2341,10 +2353,11 @@ where
&mut self,
block: RecoveredBlock<N::Block>,
) -> Result<InsertPayloadOk, InsertBlockError<N::Block>> {
self.insert_block_inner(block.clone())
self.insert_block_inner2(block.clone())
.map_err(|kind| InsertBlockError::new(block.into_sealed_block(), kind))
}
#[allow(unused)]
fn insert_block_inner(
&mut self,
block: RecoveredBlock<N::Block>,
@@ -2650,6 +2663,242 @@ where
Ok(InsertPayloadOk::Inserted(BlockStatus::Valid))
}
#[allow(unused)]
fn insert_block_inner2(
&mut self,
block: RecoveredBlock<N::Block>,
) -> Result<InsertPayloadOk, InsertBlockErrorKind> {
let block_num_hash = block.num_hash();
debug!(target: "engine::tree", block=?block_num_hash, parent = ?block.parent_hash(), state_root = ?block.state_root(), "Inserting new block into tree");
if self.block_by_hash(block.hash())?.is_some() {
return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid))
}
let start = Instant::now();
trace!(target: "engine::tree", block=?block_num_hash, "Validating block consensus");
// validate block consensus rules
self.validate_block(&block)?;
trace!(target: "engine::tree", block=?block_num_hash, parent=?block.parent_hash(), "Fetching block state provider");
let Some(state_provider) = self.state_provider(block.parent_hash())? else {
// we don't have the state required to execute this block, buffering it and find the
// missing parent block
let missing_ancestor = self
.state
.buffer
.lowest_ancestor(&block.parent_hash())
.map(|block| block.parent_num_hash())
.unwrap_or_else(|| block.parent_num_hash());
self.state.buffer.insert_block(block);
return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected {
head: self.state.tree_state.current_canonical_head,
missing_ancestor,
}))
};
// now validate against the parent
let parent_block = self.sealed_header_by_hash(block.parent_hash())?.ok_or_else(|| {
InsertBlockErrorKind::Provider(ProviderError::HeaderNotFound(
block.parent_hash().into(),
))
})?;
if let Err(e) =
self.consensus.validate_header_against_parent(block.sealed_header(), &parent_block)
{
warn!(target: "engine::tree", ?block, "Failed to validate header {} against parent: {e}", block.hash());
return Err(e.into())
}
// We only run the parallel state root if we are currently persisting blocks that are all
// ancestors of the one we are executing. If we're committing ancestor blocks, then: any
// trie updates being committed are a subset of the in-memory trie updates collected before
// fetching reverts. So any diff in reverts (pre vs post commit) is already covered by the
// in-memory trie updates we collect in `compute_state_root_parallel`.
//
// See https://github.com/paradigmxyz/reth/issues/12688 for more details
let is_descendant_of_persisting_blocks =
self.is_descendant_of_persisting_blocks(block.header());
let Some(provider_builder) = self.state_provider_builder(block.parent_hash())? else {
// TODO(mattsse): this is the same logic as the `state_provider` call above and should
// be unified
unreachable!()
};
// use prewarming background task
let header = block.clone_sealed_header();
let txs = block.clone_transactions_recovered().collect();
let mut handle = if is_descendant_of_persisting_blocks && self.config.use_state_root_task()
{
// use background tasks for state root calc
let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
// Compute trie input
let trie_input_start = Instant::now();
let trie_input = self
.compute_trie_input(consistent_view.clone(), block.header().parent_hash())
.map_err(|e| InsertBlockErrorKind::Other(Box::new(e)))?;
self.metrics
.block_validation
.trie_input_duration
.set(trie_input_start.elapsed().as_secs_f64());
self.payload_processor.spawn(header, txs, provider_builder, consistent_view, trie_input)
} else {
self.payload_processor.spawn_cache_exclusive(header, txs, provider_builder)
};
// Use cached state provider before executing, used in execution after prewarming threads
// complete
let state_provider = CachedStateProvider::new_with_caches(
state_provider,
handle.caches(),
handle.cache_metrics(),
);
trace!(target: "engine::tree", block=?block_num_hash, "Executing block");
let executor = self.executor_provider.executor(StateProviderDatabase::new(&state_provider));
let execution_start = Instant::now();
let output = self.metrics.executor.execute_metered(
executor,
&block,
Box::new(handle.state_hook()),
)?;
let execution_time = execution_start.elapsed();
trace!(target: "engine::tree", elapsed = ?execution_time, number=?block_num_hash.number, "Executed block");
// after executing the block we can stop executing transactions
handle.stop_prewarming_execution();
if let Err(err) = self.consensus.validate_block_post_execution(&block, &output) {
// call post-block hook
self.invalid_block_hook.on_invalid_block(&parent_block, &block, &output, None);
return Err(err.into())
}
let hashed_state = self.provider.hashed_post_state(&output.state);
if let Err(err) = self
.payload_validator
.validate_block_post_execution_with_hashed_state(&hashed_state, &block)
{
// call post-block hook
self.invalid_block_hook.on_invalid_block(&parent_block, &block, &output, None);
return Err(err.into())
}
trace!(target: "engine::tree", block=?block_num_hash, "Calculating block state root");
let root_time = Instant::now();
// (state_root, trie_output, root_elapsed)
let mut maybe_state_root = None;
if is_descendant_of_persisting_blocks {
// if we new payload extends the current canonical change we attempt to use the
// background task or try to compute it in parallel
if self.config.use_state_root_task() {
match handle.state_root() {
Ok(res) => {
// we double check the state root here for good measure
// TODO: clean this ups
if res.state_root.0 == block.header().state_root() {
maybe_state_root =
Some((res.state_root.0, res.state_root.1, res.total_time))
}
}
Err(error) => {
debug!(target: "engine", %error, "Background parallel state root computation failed");
}
}
} else {
match self.compute_state_root_parallel(block.header().parent_hash(), &hashed_state)
{
Ok(result) => {
info!(
target: "engine::tree",
block = ?block_num_hash,
regular_state_root = ?result.0,
"Regular root task finished"
);
maybe_state_root = Some((result.0, result.1, root_time.elapsed()));
}
Err(ParallelStateRootError::Provider(ProviderError::ConsistentView(error))) => {
debug!(target: "engine", %error, "Parallel state root computation failed consistency check, falling back");
}
Err(error) => return Err(InsertBlockErrorKind::Other(Box::new(error))),
}
}
}
let (state_root, trie_output, root_elapsed) = if let Some(maybe_state_root) =
maybe_state_root
{
maybe_state_root
} else {
// fallback is to compute the state root regularily in sync
debug!(target: "engine::tree", block=?block_num_hash, ?is_descendant_of_persisting_blocks, "Failed to compute state root in parallel");
let (root, updates) = state_provider.state_root_with_updates(hashed_state.clone())?;
(root, updates, root_time.elapsed())
};
// ensure state root matches
if state_root != block.header().state_root() {
// call post-block hook
self.invalid_block_hook.on_invalid_block(
&parent_block,
&block,
&output,
Some((&trie_output, state_root)),
);
return Err(ConsensusError::BodyStateRootDiff(
GotExpected { got: state_root, expected: block.header().state_root() }.into(),
)
.into())
}
// terminate prewarming task with good state output
handle.terminate_caching(Some(output.state.clone()));
let executed: ExecutedBlockWithTrieUpdates<N> = ExecutedBlockWithTrieUpdates {
block: ExecutedBlock {
recovered_block: Arc::new(block),
execution_output: Arc::new(ExecutionOutcome::from((output, block_num_hash.number))),
hashed_state: Arc::new(hashed_state),
},
trie: Arc::new(trie_output),
};
// if the parent is the canonical head, we can insert the block as the pending block
if self.state.tree_state.canonical_block_hash() == executed.recovered_block().parent_hash()
{
debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
self.canonical_in_memory_state.set_pending_block(executed.clone());
}
self.state.tree_state.insert_executed(executed.clone());
self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
// emit insert event
let elapsed = start.elapsed();
let engine_event = if self.is_fork(block_num_hash.hash)? {
BeaconConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
} else {
BeaconConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
};
self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
debug!(target: "engine::tree", block=?block_num_hash, "Finished inserting block");
Ok(InsertPayloadOk::Inserted(BlockStatus::Valid))
}
/// Compute state root for the given hashed post state in parallel.
///
/// # Returns
@@ -4227,8 +4476,8 @@ mod tests {
);
}
#[tokio::test]
async fn test_tree_state_on_new_head_deep_fork() {
#[test]
fn test_tree_state_on_new_head_deep_fork() {
reth_tracing::init_test_tracing();
let chain_spec = MAINNET.clone();

View File

@@ -0,0 +1,71 @@
use rayon::ThreadPool as RayonPool;
use std::sync::{Arc, OnceLock};
use tokio::{
runtime::{Handle, Runtime},
task::JoinHandle,
};
/// An executor for mixed I/O and CPU workloads.
///
/// This type has access to its own rayon pool and uses tokio to spawn blocking tasks.
///
/// It will reuse an existing tokio runtime if available or create its own.
#[derive(Debug, Clone)]
pub(crate) struct WorkloadExecutor {
inner: WorkloadExecutorInner,
}
impl WorkloadExecutor {
/// Creates a new instance with default settings.
pub(crate) fn new() -> Self {
Self { inner: WorkloadExecutorInner::new(rayon::ThreadPoolBuilder::new().build().unwrap()) }
}
/// Creates a new executor with the given number of threads for cpu bound work (rayon).
#[allow(unused)]
pub(super) fn with_num_cpu_threads(cpu_threads: usize) -> Self {
Self {
inner: WorkloadExecutorInner::new(
rayon::ThreadPoolBuilder::new().num_threads(cpu_threads).build().unwrap(),
),
}
}
/// Shorthand for [`Runtime::spawn_blocking`]
#[track_caller]
pub(super) fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
self.inner.handle.spawn_blocking(func)
}
/// Returns access to the rayon pool
pub(super) fn rayon_pool(&self) -> &Arc<rayon::ThreadPool> {
&self.inner.rayon_pool
}
}
#[derive(Debug, Clone)]
struct WorkloadExecutorInner {
handle: Handle,
rayon_pool: Arc<RayonPool>,
}
impl WorkloadExecutorInner {
fn new(rayon_pool: rayon::ThreadPool) -> Self {
fn get_runtime_handle() -> Handle {
Handle::try_current().unwrap_or_else(|_| {
// Create a new runtime if now runtime is available
static RT: OnceLock<Runtime> = OnceLock::new();
let rt = RT.get_or_init(|| Runtime::new().unwrap());
rt.handle().clone()
})
}
Self { handle: get_runtime_handle(), rayon_pool: Arc::new(rayon_pool) }
}
}

View File

@@ -0,0 +1,377 @@
//! Entrypoint for payload processing.
use crate::tree::{
cached_state::{CachedStateMetrics, ProviderCacheBuilder, ProviderCaches, SavedCache},
payload_processor::{
executor::WorkloadExecutor,
prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmTaskEvent},
sparse_trie::{SparseTrieTask, StateRootComputeOutcome},
},
StateProviderBuilder, TreeConfig,
};
use alloy_consensus::{transaction::Recovered, BlockHeader};
use alloy_primitives::B256;
use multiproof::*;
use parking_lot::RwLock;
use reth_evm::{
system_calls::{OnStateHook, StateChangeSource},
ConfigureEvm, ConfigureEvmEnvFor,
};
use reth_primitives_traits::{NodePrimitives, SealedHeaderFor};
use reth_provider::{
providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, StateCommitmentProvider,
StateProviderFactory, StateReader,
};
use reth_revm::{db::BundleState, state::EvmState};
use reth_trie::TrieInput;
use reth_trie_parallel::root::ParallelStateRootError;
use std::{
collections::VecDeque,
sync::{
mpsc,
mpsc::{channel, Sender},
Arc,
},
};
pub(crate) mod executor;
mod multiproof;
mod prewarm;
mod sparse_trie;
/// Entrypoint for executing the payload.
pub(super) struct PayloadProcessor<N, Evm> {
/// The executor used by to spawn tasks.
executor: WorkloadExecutor,
/// The most recent cache used for execution.
execution_cache: ExecutionCache,
/// Metrics for trie operations
trie_metrics: MultiProofTaskMetrics,
/// Cross-block cache size in bytes.
cross_block_cache_size: u64,
/// Whether transactions should be executed on prewarming task.
use_transaction_prewarming: bool,
/// Determines how to configure the evm for execution.
evm_config: Evm,
_marker: std::marker::PhantomData<N>,
}
impl<N, Evm> PayloadProcessor<N, Evm> {
pub(super) fn new(executor: WorkloadExecutor, evm_config: Evm, config: &TreeConfig) -> Self {
Self {
executor,
execution_cache: Default::default(),
trie_metrics: Default::default(),
cross_block_cache_size: config.cross_block_cache_size(),
use_transaction_prewarming: config.use_caching_and_prewarming(),
evm_config,
_marker: Default::default(),
}
}
}
impl<N, Evm> PayloadProcessor<N, Evm>
where
N: NodePrimitives,
Evm: ConfigureEvmEnvFor<N>
+ 'static
+ ConfigureEvm<Header = N::BlockHeader, Transaction = N::SignedTx>
+ 'static,
{
/// Spawns all background tasks and returns a handle connected to the tasks.
///
/// - Transaction prewarming task
/// - State root task
/// - Sparse trie task
///
/// # Transaction prewarming task
///
/// Responsible for feeding state updates to the multi proof task.
///
/// This task runs until:
/// - externally cancelled (e.g. sequential block execution is complete)
///
/// ## Multi proof task
///
/// Responsible for preparing sparse trie messages for the sparse trie task.
/// A state update (e.g. tx output) is converted into a multiproof calculation that returns an
/// output back to this task.
///
/// Receives updates from sequential execution.
/// This task runs until it receives a shutdown signal, which should be after after the block
/// was fully executed.
///
/// ## Sparse trie task
///
/// Responsible for calculating the state root based on the received [`SparseTrieUpdate`].
///
/// This task runs until there are no further updates to process.
///
///
/// This returns a handle to await the final state root and to interact with the tasks (e.g.
/// canceling)
pub(super) fn spawn<P>(
&self,
header: SealedHeaderFor<N>,
transactions: VecDeque<Recovered<N::SignedTx>>,
provider_builder: StateProviderBuilder<N, P>,
consistent_view: ConsistentDbView<P>,
trie_input: TrieInput,
) -> PayloadHandle
where
P: DatabaseProviderFactory<Provider: BlockReader>
+ BlockReader
+ StateProviderFactory
+ StateReader
+ StateCommitmentProvider
+ Clone
+ 'static,
{
let (to_sparse_trie, sparse_trie_rx) = channel();
// spawn multiproof task
let state_root_config = MultiProofConfig::new_from_input(consistent_view, trie_input);
let multi_proof_task =
MultiProofTask::new(state_root_config.clone(), self.executor.clone(), to_sparse_trie);
// wire the multiproof task to the prewarm task
let to_multi_proof = Some(multi_proof_task.state_root_message_sender());
let prewarm_handle =
self.spawn_caching_with(header, transactions, provider_builder, to_multi_proof.clone());
// spawn multi-proof task
self.executor.spawn_blocking(move || {
multi_proof_task.run();
});
let sparse_trie_task = SparseTrieTask {
executor: self.executor.clone(),
updates: sparse_trie_rx,
config: state_root_config,
metrics: self.trie_metrics.clone(),
};
// wire the sparse trie to the state root response receiver
let (state_root_tx, state_root_rx) = channel();
self.executor.spawn_blocking(move || {
let res = sparse_trie_task.run();
let _ = state_root_tx.send(res);
});
PayloadHandle { to_multi_proof, prewarm_handle, state_root: Some(state_root_rx) }
}
/// Spawn cache prewarming exclusively.
///
/// Returns a [`PayloadHandle`] to communicate with the task.
pub(super) fn spawn_cache_exclusive<P>(
&self,
header: SealedHeaderFor<N>,
transactions: VecDeque<Recovered<N::SignedTx>>,
provider_builder: StateProviderBuilder<N, P>,
) -> PayloadHandle
where
P: BlockReader
+ StateProviderFactory
+ StateReader
+ StateCommitmentProvider
+ Clone
+ 'static,
{
let prewarm_handle = self.spawn_caching_with(header, transactions, provider_builder, None);
PayloadHandle { to_multi_proof: None, prewarm_handle, state_root: None }
}
/// Spawn prewarming optionally wired to the multiproof task for target updates.
fn spawn_caching_with<P>(
&self,
header: SealedHeaderFor<N>,
mut transactions: VecDeque<Recovered<N::SignedTx>>,
provider_builder: StateProviderBuilder<N, P>,
to_multi_proof: Option<Sender<MultiProofMessage>>,
) -> CacheTaskHandle
where
P: BlockReader
+ StateProviderFactory
+ StateReader
+ StateCommitmentProvider
+ Clone
+ 'static,
{
if !self.use_transaction_prewarming {
// if no transactions should be executed we clear them but still spawn the task for
// caching updates
transactions.clear();
}
let (cache, cache_metrics) = self.cache_for(header.parent_hash()).split();
// configure prewarming
let prewarm_ctx = PrewarmContext {
header,
evm_config: self.evm_config.clone(),
cache: cache.clone(),
cache_metrics: cache_metrics.clone(),
provider: provider_builder,
};
let prewarm_task = PrewarmCacheTask::new(
self.executor.clone(),
self.execution_cache.clone(),
prewarm_ctx,
to_multi_proof,
transactions,
);
let to_prewarm_task = prewarm_task.actions_tx();
// spawn pre-warm task
self.executor.spawn_blocking(move || {
prewarm_task.run();
});
CacheTaskHandle { cache, to_prewarm_task: Some(to_prewarm_task), cache_metrics }
}
/// Returns the cache for the given parent hash.
///
/// If the given hash is different then what is recently cached, then this will create a new
/// instance.
fn cache_for(&self, parent_hash: B256) -> SavedCache {
self.execution_cache.get_cache_for(parent_hash).unwrap_or_else(|| {
let cache = ProviderCacheBuilder::default().build_caches(self.cross_block_cache_size);
SavedCache::new(parent_hash, cache, CachedStateMetrics::zeroed())
})
}
}
/// Handle to all the spawned tasks.
pub(super) struct PayloadHandle {
/// Channel for evm state updates
to_multi_proof: Option<Sender<MultiProofMessage>>,
// must include the receiver of the state root wired to the sparse trie
prewarm_handle: CacheTaskHandle,
/// Receiver for the state root
state_root: Option<mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>>,
}
impl PayloadHandle {
/// Awaits the state root
///
/// # Panics
///
/// If payload processing was started without background tasks.
pub(super) fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
self.state_root
.take()
.expect("state_root is None")
.recv()
.map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
}
/// Returns a state hook to be used to send state updates to this task.
///
/// If a multiproof task is spawned the hook will notify it about new states.
pub(super) fn state_hook(&self) -> impl OnStateHook {
// convert the channel into a `StateHookSender` that emits an event on drop
let to_multi_proof = self.to_multi_proof.clone().map(StateHookSender::new);
move |source: StateChangeSource, state: &EvmState| {
if let Some(sender) = &to_multi_proof {
let _ = sender.send(MultiProofMessage::StateUpdate(source, state.clone()));
}
}
}
/// Returns a clone of the caches used by prewarming
pub(super) fn caches(&self) -> ProviderCaches {
self.prewarm_handle.cache.clone()
}
pub(super) fn cache_metrics(&self) -> CachedStateMetrics {
self.prewarm_handle.cache_metrics.clone()
}
/// Terminates the pre-warming transaction processing.
///
/// Note: This does not terminate the task yet.
pub(super) fn stop_prewarming_execution(&self) {
self.prewarm_handle.stop_prewarming_execution()
}
/// Terminates the entire caching task.
///
/// If the [`BundleState`] is provided it will update the shared cache.
pub(super) fn terminate_caching(&mut self, block_output: Option<BundleState>) {
self.prewarm_handle.terminate_caching(block_output)
}
}
/// Access to the spawned [`PrewarmCacheTask`].
pub(crate) struct CacheTaskHandle {
/// The shared cache the task operates with.
cache: ProviderCaches,
/// Metrics for the caches
cache_metrics: CachedStateMetrics,
/// Channel to the spawned prewarm task if any
to_prewarm_task: Option<Sender<PrewarmTaskEvent>>,
}
impl CacheTaskHandle {
/// Terminates the pre-warming transaction processing.
///
/// Note: This does not terminate the task yet.
pub(super) fn stop_prewarming_execution(&self) {
self.to_prewarm_task
.as_ref()
.map(|tx| tx.send(PrewarmTaskEvent::TerminateTransactionExecution).ok());
}
/// Terminates the entire pre-warming task.
///
/// If the [`BundleState`] is provided it will update the shared cache.
pub(super) fn terminate_caching(&mut self, block_output: Option<BundleState>) {
self.to_prewarm_task
.take()
.map(|tx| tx.send(PrewarmTaskEvent::Terminate { block_output }).ok());
}
}
impl Drop for CacheTaskHandle {
fn drop(&mut self) {
// Ensure we always terminate on drop
self.terminate_caching(None);
}
}
/// Shared access to most recently used cache.
///
/// This cache is intended to used for processing the payload in the following manner:
/// - Get Cache if the payload's parent block matches the parent block
/// - Update cache upon successful payload execution
///
/// This process assumes that payloads are received sequentially.
#[derive(Clone, Debug, Default)]
struct ExecutionCache {
/// Guarded cloneable cache identified by a block hash.
inner: Arc<RwLock<Option<SavedCache>>>,
}
impl ExecutionCache {
/// Returns the cache if the currently store cache is for the given `parent_hash`
pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
let cache = self.inner.read();
cache
.as_ref()
.and_then(|cache| (cache.executed_block_hash() == parent_hash).then(|| cache.clone()))
}
/// Clears the tracked cashe
#[allow(unused)]
pub(crate) fn clear(&self) {
self.inner.write().take();
}
/// Stores the provider cache
pub(crate) fn save_cache(&self, cache: SavedCache) {
self.inner.write().replace(cache);
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,305 @@
use crate::tree::{
cached_state::{CachedStateMetrics, CachedStateProvider, ProviderCaches, SavedCache},
payload_processor::{
executor::WorkloadExecutor, multiproof::MultiProofMessage, ExecutionCache,
},
StateProviderBuilder,
};
use alloy_consensus::transaction::Recovered;
use alloy_primitives::{keccak256, map::B256Set, B256};
use reth_evm::{ConfigureEvm, ConfigureEvmEnvFor, Evm};
use reth_primitives_traits::{header::SealedHeaderFor, NodePrimitives, SignedTransaction};
use reth_provider::{BlockReader, StateCommitmentProvider, StateProviderFactory, StateReader};
use reth_revm::{database::StateProviderDatabase, db::BundleState, state::EvmState};
use reth_trie::MultiProofTargets;
use std::{
collections::VecDeque,
sync::mpsc::{channel, Receiver, Sender},
};
use tracing::{debug, trace};
/// A task that is responsible for caching and prewarming the cache by executing transactions
/// individually in parallel.
///
/// Note: This task runs until cancelled externally.
pub(super) struct PrewarmCacheTask<N: NodePrimitives, P, Evm> {
/// The executor used to spawn execution tasks.
executor: WorkloadExecutor,
/// Shared execution cache.
execution_cache: ExecutionCache,
/// Transactions pending execution.
pending: VecDeque<Recovered<N::SignedTx>>,
/// Context provided to execution tasks
ctx: PrewarmContext<N, P, Evm>,
/// How many txs are currently in progress
in_progress: usize,
/// How many transactions should be executed in parallel
max_concurrency: usize,
/// Sender to emit evm state outcome messages, if any.
to_multi_proof: Option<Sender<MultiProofMessage>>,
/// Receiver for events produced by tx execution
actions_rx: Receiver<PrewarmTaskEvent>,
/// Sender the transactions use to send their result back
actions_tx: Sender<PrewarmTaskEvent>,
}
impl<N, P, Evm> PrewarmCacheTask<N, P, Evm>
where
N: NodePrimitives,
P: BlockReader + StateProviderFactory + StateReader + StateCommitmentProvider + Clone + 'static,
Evm: ConfigureEvmEnvFor<N>
+ 'static
+ ConfigureEvm<Header = N::BlockHeader, Transaction = N::SignedTx>
+ 'static,
{
/// Intializes the task with the given transactions pending execution
pub(super) fn new(
executor: WorkloadExecutor,
execution_cache: ExecutionCache,
ctx: PrewarmContext<N, P, Evm>,
to_multi_proof: Option<Sender<MultiProofMessage>>,
pending: VecDeque<Recovered<N::SignedTx>>,
) -> Self {
let (actions_tx, actions_rx) = channel();
Self {
executor,
execution_cache,
pending,
ctx,
in_progress: 0,
// TODO settings
max_concurrency: 4,
to_multi_proof,
actions_rx,
actions_tx,
}
}
/// Returns the sender that can communicate with this task.
pub(super) fn actions_tx(&self) -> Sender<PrewarmTaskEvent> {
self.actions_tx.clone()
}
/// Spawns the next transactions
fn spawn_next(&mut self) {
while self.in_progress < self.max_concurrency {
if let Some(tx) = self.pending.pop_front() {
self.spawn_transaction(tx);
} else {
break
}
}
}
/// Spawns the given transaction as a blocking task.
fn spawn_transaction(&self, tx: Recovered<N::SignedTx>) {
let ctx = self.ctx.clone();
let actions_tx = self.actions_tx.clone();
let prepare_proof_targets = self.should_prepare_multi_proof_targets();
self.executor.spawn_blocking(move || {
// depending on whether this task needs he proof targets we either just transact or
// transact and prepare the targets
let proof_targets = if prepare_proof_targets {
ctx.prepare_multiproof_targets(tx)
} else {
ctx.transact(tx);
None
};
let _ = actions_tx.send(PrewarmTaskEvent::Outcome { proof_targets });
});
}
/// Returns true if the tx prewarming tasks should prepare multiproof targets.
fn should_prepare_multi_proof_targets(&self) -> bool {
self.to_multi_proof.is_some()
}
/// If configured and the tx returned proof targets, emit the targets the transaction produced
fn send_multi_proof_targets(&self, targets: Option<MultiProofTargets>) {
if let Some((proof_targets, to_multi_proof)) = targets.zip(self.to_multi_proof.as_ref()) {
let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(proof_targets));
}
}
/// Save the state to the shared cache for the given block.
fn save_cache(&self, state: BundleState) {
let cache = SavedCache::new(
self.ctx.header.hash(),
self.ctx.cache.clone(),
self.ctx.cache_metrics.clone(),
);
if cache.cache().insert_state(&state).is_err() {
return
}
cache.update_metrics();
debug!(target: "engine::caching", "Updated state caches");
// update the cache for the executed block
self.execution_cache.save_cache(cache);
}
/// Executes the task.
///
/// This will execute the transactions until all transactions have been processed or the task
/// was cancelled.
pub(super) fn run(mut self) {
// spawn execution tasks.
self.spawn_next();
while let Ok(event) = self.actions_rx.recv() {
match event {
PrewarmTaskEvent::TerminateTransactionExecution => {
// stop tx processing
self.pending.clear();
}
PrewarmTaskEvent::Outcome { proof_targets } => {
// completed a transaction, frees up one slot
self.in_progress -= 1;
self.send_multi_proof_targets(proof_targets);
}
PrewarmTaskEvent::Terminate { block_output } => {
// terminate the task
if let Some(state) = block_output {
self.save_cache(state);
}
break
}
}
// schedule followup transactions
self.spawn_next();
}
}
}
/// Context required by tx execution tasks.
#[derive(Debug, Clone)]
pub(super) struct PrewarmContext<N: NodePrimitives, P, Evm> {
pub(super) header: SealedHeaderFor<N>,
pub(super) evm_config: Evm,
pub(super) cache: ProviderCaches,
pub(super) cache_metrics: CachedStateMetrics,
/// Provider to obtain the state
pub(super) provider: StateProviderBuilder<N, P>,
}
impl<N, P, Evm> PrewarmContext<N, P, Evm>
where
N: NodePrimitives,
P: BlockReader + StateProviderFactory + StateReader + StateCommitmentProvider + Clone + 'static,
Evm: ConfigureEvmEnvFor<N>
+ 'static
+ ConfigureEvm<Header = N::BlockHeader, Transaction = N::SignedTx>
+ 'static,
{
/// Transacts the the transactions and transform the state into [`MultiProofTargets`].
fn prepare_multiproof_targets(self, tx: Recovered<N::SignedTx>) -> Option<MultiProofTargets> {
let state = self.transact(tx)?;
let mut targets = MultiProofTargets::with_capacity(state.len());
for (addr, account) in state {
// if the account was not touched, or if the account was selfdestructed, do not
// fetch proofs for it
//
// Since selfdestruct can only happen in the same transaction, we can skip
// prefetching proofs for selfdestructed accounts
//
// See: https://eips.ethereum.org/EIPS/eip-6780
if !account.is_touched() || account.is_selfdestructed() {
continue
}
let mut storage_set =
B256Set::with_capacity_and_hasher(account.storage.len(), Default::default());
for (key, slot) in account.storage {
// do nothing if unchanged
if !slot.is_changed() {
continue
}
storage_set.insert(keccak256(B256::new(key.to_be_bytes())));
}
targets.insert(keccak256(addr), storage_set);
}
Some(targets)
}
/// Transacts the transaction and returns the state outcome.
///
/// Returns `None` if executing the transaction failed to a non Revert error.
/// Returns the touched+modified state of the transaction.
///
/// Note: Since here are no ordering guarantees this won't the state the tx produces when
/// executed sequentially.
fn transact(self, tx: Recovered<N::SignedTx>) -> Option<EvmState> {
let Self { header, evm_config, cache: caches, cache_metrics, provider } = self;
// Create the state provider inside the thread
let state_provider = match provider.build() {
Ok(provider) => provider,
Err(err) => {
trace!(
target: "engine::tree",
%err,
"Failed to build state provider in prewarm thread"
);
return None
}
};
// Use the caches to create a new provider with caching
let state_provider =
CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics);
let state_provider = StateProviderDatabase::new(&state_provider);
let mut evm_env = evm_config.evm_env(&header);
// we must disable the nonce check so that we can execute the transaction even if the nonce
// doesn't match what's on chain.
evm_env.cfg_env.disable_nonce_check = true;
// create a new executor and disable nonce checks in the env
let mut evm = evm_config.evm_with_env(state_provider, evm_env);
// create the tx env and reset nonce
let tx_env = evm_config.tx_env(&tx);
let res = match evm.transact(tx_env) {
Ok(res) => res,
Err(err) => {
trace!(
target: "engine::tree",
%err,
tx_hash=%tx.tx_hash(),
sender=%tx.signer(),
"Error when executing prewarm transaction",
);
return None
}
};
Some(res.state)
}
}
/// The events the pre-warm task can handle.
pub(super) enum PrewarmTaskEvent {
/// Forcefully terminate all remaining transaction execution.
TerminateTransactionExecution,
/// Forcefully terminate the task on demand and update the shared cache with the given output
/// before exiting.
Terminate {
/// The final block state output.
block_output: Option<BundleState>,
},
/// The outcome of a pre-warm task
Outcome {
/// The prepared proof targets based on the evm state outcome
proof_targets: Option<MultiProofTargets>,
},
}

View File

@@ -0,0 +1,198 @@
//! Contains the implementation of the sparse trie logic responsible for creating the
use crate::tree::payload_processor::{
executor::WorkloadExecutor,
multiproof::{MultiProofConfig, MultiProofTaskMetrics, SparseTrieUpdate},
};
use alloy_primitives::B256;
use rayon::iter::{ParallelBridge, ParallelIterator};
use reth_provider::{BlockReader, DBProvider, DatabaseProviderFactory, StateCommitmentProvider};
use reth_trie::{
hashed_cursor::HashedPostStateCursorFactory, proof::ProofBlindedProviderFactory,
trie_cursor::InMemoryTrieCursorFactory, updates::TrieUpdates, Nibbles,
};
use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
use reth_trie_parallel::root::ParallelStateRootError;
use reth_trie_sparse::{
blinded::{BlindedProvider, BlindedProviderFactory},
errors::{SparseStateTrieResult, SparseTrieErrorKind},
SparseStateTrie,
};
use std::{
sync::mpsc,
time::{Duration, Instant},
};
use tracing::{debug, trace, trace_span};
/// The level below which the sparse trie hashes are calculated in
/// [`update_sparse_trie`].
const SPARSE_TRIE_INCREMENTAL_LEVEL: usize = 2;
/// A task responsible for populating the sparse trie.
pub(super) struct SparseTrieTask<F> {
/// Executor used to spawn subtasks.
#[allow(unused)] // TODO use this for spawning trie tasks
pub(super) executor: WorkloadExecutor,
/// Receives updates from the state root task.
pub(super) updates: mpsc::Receiver<SparseTrieEvent>,
// TODO: ideally we need a way to create multiple readers on demand.
pub(super) config: MultiProofConfig<F>,
pub(super) metrics: MultiProofTaskMetrics,
}
impl<F> SparseTrieTask<F>
where
F: DatabaseProviderFactory<Provider: BlockReader> + StateCommitmentProvider,
{
/// Runs the sparse trie task to completion.
///
/// This waits for new incoming [`SparseTrieUpdate`].
///
/// This concludes once the last trie update has been received.
pub(super) fn run(self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
let now = Instant::now();
let provider_ro = self.config.consistent_view.provider_ro()?;
let in_memory_trie_cursor = InMemoryTrieCursorFactory::new(
DatabaseTrieCursorFactory::new(provider_ro.tx_ref()),
&self.config.nodes_sorted,
);
let blinded_provider_factory = ProofBlindedProviderFactory::new(
in_memory_trie_cursor.clone(),
HashedPostStateCursorFactory::new(
DatabaseHashedCursorFactory::new(provider_ro.tx_ref()),
&self.config.state_sorted,
),
self.config.prefix_sets.clone(),
);
let mut num_iterations = 0;
let mut trie = SparseStateTrie::new(blinded_provider_factory).with_updates(true);
while let Ok(mut update) = self.updates.recv() {
num_iterations += 1;
let mut num_updates = 1;
while let Ok(next) = self.updates.try_recv() {
update.extend(next);
num_updates += 1;
}
debug!(
target: "engine::root",
num_updates,
account_proofs = update.multiproof.account_subtree.len(),
storage_proofs = update.multiproof.storages.len(),
"Updating sparse trie"
);
let elapsed = update_sparse_trie(&mut trie, update).map_err(|e| {
ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
})?;
self.metrics.sparse_trie_update_duration_histogram.record(elapsed);
trace!(target: "engine::root", ?elapsed, num_iterations, "Root calculation completed");
}
debug!(target: "engine::root", num_iterations, "All proofs processed, ending calculation");
let start = Instant::now();
let (state_root, trie_updates) = trie.root_with_updates().map_err(|e| {
ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
})?;
self.metrics.sparse_trie_final_update_duration_histogram.record(start.elapsed());
Ok(StateRootComputeOutcome {
state_root: (state_root, trie_updates),
total_time: now.elapsed(),
})
}
}
/// Outcome of the state root computation, including the state root itself with
/// the trie updates and the total time spent.
#[derive(Debug)]
pub(crate) struct StateRootComputeOutcome {
/// The computed state root and trie updates
pub state_root: (B256, TrieUpdates),
/// The total time spent calculating the state root
pub total_time: Duration,
}
/// Aliased for now to not introduce too many changes at once.
pub(super) type SparseTrieEvent = SparseTrieUpdate;
// /// The event type the sparse trie task operates on.
// pub(crate) enum SparseTrieEvent {
// /// Updates received from the multiproof task.
// ///
// /// This represents a stream of [`SparseTrieUpdate`] where a `None` indicates that all
// updates /// have been received.
// Update(Option<SparseTrieUpdate>),
// }
/// Updates the sparse trie with the given proofs and state, and returns the elapsed time.
pub(crate) fn update_sparse_trie<BPF>(
trie: &mut SparseStateTrie<BPF>,
SparseTrieUpdate { state, multiproof }: SparseTrieUpdate,
) -> SparseStateTrieResult<Duration>
where
BPF: BlindedProviderFactory + Send + Sync,
BPF::AccountNodeProvider: BlindedProvider + Send + Sync,
BPF::StorageNodeProvider: BlindedProvider + Send + Sync,
{
trace!(target: "engine::root::sparse", "Updating sparse trie");
let started_at = Instant::now();
// Reveal new accounts and storage slots.
trie.reveal_multiproof(multiproof)?;
// Update storage slots with new values and calculate storage roots.
let (tx, rx) = mpsc::channel();
state
.storages
.into_iter()
.map(|(address, storage)| (address, storage, trie.take_storage_trie(&address)))
.par_bridge()
.map(|(address, storage, storage_trie)| {
let span = trace_span!(target: "engine::root::sparse", "Storage trie", ?address);
let _enter = span.enter();
trace!(target: "engine::root::sparse", "Updating storage");
let mut storage_trie = storage_trie.ok_or(SparseTrieErrorKind::Blind)?;
if storage.wiped {
trace!(target: "engine::root::sparse", "Wiping storage");
storage_trie.wipe()?;
}
for (slot, value) in storage.storage {
let slot_nibbles = Nibbles::unpack(slot);
if value.is_zero() {
trace!(target: "engine::root::sparse", ?slot, "Removing storage slot");
storage_trie.remove_leaf(&slot_nibbles)?;
} else {
trace!(target: "engine::root::sparse", ?slot, "Updating storage slot");
storage_trie
.update_leaf(slot_nibbles, alloy_rlp::encode_fixed_size(&value).to_vec())?;
}
}
storage_trie.root();
SparseStateTrieResult::Ok((address, storage_trie))
})
.for_each_init(|| tx.clone(), |tx, result| tx.send(result).unwrap());
drop(tx);
for result in rx {
let (address, storage_trie) = result?;
trie.insert_storage_trie(address, storage_trie);
}
// Update accounts with new values
for (address, account) in state.accounts {
trace!(target: "engine::root::sparse", ?address, "Updating account");
trie.update_account(address, account.unwrap_or_default())?;
}
trie.calculate_below_level(SPARSE_TRIE_INCREMENTAL_LEVEL);
let elapsed = started_at.elapsed();
Ok(elapsed)
}

View File

@@ -498,7 +498,7 @@ where
#[derive(Metrics, Clone)]
#[metrics(scope = "tree.root")]
struct StateRootTaskMetrics {
pub(crate) struct StateRootTaskMetrics {
/// Histogram of proof calculation durations.
pub proof_calculation_duration_histogram: Histogram,
/// Histogram of proof calculation account targets.