mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-08 03:01:12 -04:00
perf(engine): clear accounts trie in background to not block state root (#17369)
This commit is contained in:
@@ -2300,7 +2300,7 @@ where
|
||||
if use_state_root_task {
|
||||
debug!(target: "engine::tree", block=?block_num_hash, "Using sparse trie state root algorithm");
|
||||
match handle.state_root() {
|
||||
Ok(StateRootComputeOutcome { state_root, trie_updates, trie }) => {
|
||||
Ok(StateRootComputeOutcome { state_root, trie_updates }) => {
|
||||
let elapsed = execution_finish.elapsed();
|
||||
info!(target: "engine::tree", ?state_root, ?elapsed, "State root task finished");
|
||||
// we double check the state root here for good measure
|
||||
@@ -2314,9 +2314,6 @@ where
|
||||
"State root task returned incorrect state root"
|
||||
);
|
||||
}
|
||||
|
||||
// hold on to the sparse trie for the next payload
|
||||
self.payload_processor.set_sparse_trie(trie);
|
||||
}
|
||||
Err(error) => {
|
||||
debug!(target: "engine::tree", %error, "Background parallel state root computation failed");
|
||||
|
||||
@@ -15,7 +15,7 @@ use std::borrow::Cow;
|
||||
/// providing flexibility in choosing the appropriate implementation based on workload
|
||||
/// characteristics.
|
||||
#[derive(Debug)]
|
||||
pub enum ConfiguredSparseTrie {
|
||||
pub(crate) enum ConfiguredSparseTrie {
|
||||
/// Serial implementation of the sparse trie.
|
||||
Serial(Box<SerialSparseTrie>),
|
||||
/// Parallel implementation of the sparse trie.
|
||||
|
||||
@@ -14,7 +14,7 @@ use alloy_evm::block::StateChangeSource;
|
||||
use alloy_primitives::B256;
|
||||
use executor::WorkloadExecutor;
|
||||
use multiproof::{SparseTrieUpdate, *};
|
||||
use parking_lot::RwLock;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use prewarm::PrewarmMetrics;
|
||||
use reth_evm::{ConfigureEvm, OnStateHook, SpecFor};
|
||||
use reth_primitives_traits::{NodePrimitives, SealedHeaderFor};
|
||||
@@ -75,9 +75,9 @@ where
|
||||
precompile_cache_disabled: bool,
|
||||
/// Precompile cache map.
|
||||
precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
|
||||
/// A cleared sparse trie, kept around to be reused for the state root computation so that
|
||||
/// allocations can be minimized.
|
||||
sparse_trie: Option<SparseTrie<ConfiguredSparseTrie>>,
|
||||
/// A cleared accounts sparse trie, kept around to be reused for the state root computation so
|
||||
/// that allocations can be minimized.
|
||||
accounts_trie: Arc<Mutex<Option<SparseTrie<ConfiguredSparseTrie>>>>,
|
||||
/// Whether to use the parallel sparse trie.
|
||||
use_parallel_sparse_trie: bool,
|
||||
_marker: std::marker::PhantomData<N>,
|
||||
@@ -104,7 +104,7 @@ where
|
||||
evm_config,
|
||||
precompile_cache_disabled: config.precompile_cache_disabled(),
|
||||
precompile_cache_map,
|
||||
sparse_trie: None,
|
||||
accounts_trie: Arc::default(),
|
||||
use_parallel_sparse_trie: config.enable_parallel_sparse_trie(),
|
||||
_marker: Default::default(),
|
||||
}
|
||||
@@ -209,15 +209,15 @@ where
|
||||
// wire the sparse trie to the state root response receiver
|
||||
let (state_root_tx, state_root_rx) = channel();
|
||||
|
||||
// Take the stored sparse trie
|
||||
let stored_trie = self.sparse_trie.take();
|
||||
// Take the stored accounts trie
|
||||
let stored_accounts_trie = self.accounts_trie.lock().take();
|
||||
|
||||
// Spawn the sparse trie task using any stored trie and parallel trie configuration.
|
||||
self.spawn_sparse_trie_task(
|
||||
sparse_trie_rx,
|
||||
proof_task.handle(),
|
||||
state_root_tx,
|
||||
stored_trie,
|
||||
stored_accounts_trie,
|
||||
self.use_parallel_sparse_trie,
|
||||
);
|
||||
|
||||
@@ -257,11 +257,6 @@ where
|
||||
PayloadHandle { to_multi_proof: None, prewarm_handle, state_root: None }
|
||||
}
|
||||
|
||||
/// Sets the sparse trie to be kept around for the state root computation.
|
||||
pub(super) fn set_sparse_trie(&mut self, sparse_trie: SparseTrie<ConfiguredSparseTrie>) {
|
||||
self.sparse_trie = Some(sparse_trie);
|
||||
}
|
||||
|
||||
/// Spawn prewarming optionally wired to the multiproof task for target updates.
|
||||
fn spawn_caching_with<P>(
|
||||
&self,
|
||||
@@ -331,9 +326,7 @@ where
|
||||
&self,
|
||||
sparse_trie_rx: mpsc::Receiver<SparseTrieUpdate>,
|
||||
proof_task_handle: BPF,
|
||||
state_root_tx: mpsc::Sender<
|
||||
Result<StateRootComputeOutcome<ConfiguredSparseTrie>, ParallelStateRootError>,
|
||||
>,
|
||||
state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
|
||||
sparse_trie: Option<SparseTrie<A>>,
|
||||
) where
|
||||
BPF: BlindedProviderFactory + Clone + Send + Sync + 'static,
|
||||
@@ -350,21 +343,22 @@ where
|
||||
sparse_trie,
|
||||
);
|
||||
|
||||
let accounts_trie = Arc::clone(&self.accounts_trie);
|
||||
self.executor.spawn_blocking(move || {
|
||||
let res = task.run();
|
||||
let converted = res.map(|outcome| StateRootComputeOutcome {
|
||||
state_root: outcome.state_root,
|
||||
trie_updates: outcome.trie_updates,
|
||||
trie: match outcome.trie {
|
||||
SparseTrie::Blind(opt) => {
|
||||
SparseTrie::Blind(opt.map(|t| Box::new(ConfiguredSparseTrie::from(*t))))
|
||||
}
|
||||
SparseTrie::Revealed(t) => {
|
||||
SparseTrie::Revealed(Box::new(ConfiguredSparseTrie::from(*t)))
|
||||
}
|
||||
},
|
||||
});
|
||||
let _ = state_root_tx.send(converted);
|
||||
let (result, trie) = task.run();
|
||||
// Send state root computation result
|
||||
let _ = state_root_tx.send(result);
|
||||
|
||||
// Clear and return accounts trie back to the payload processor
|
||||
let trie = match trie {
|
||||
SparseTrie::Blind(opt) => {
|
||||
SparseTrie::Blind(opt.map(|t| Box::new(ConfiguredSparseTrie::from(*t))))
|
||||
}
|
||||
SparseTrie::Revealed(t) => {
|
||||
SparseTrie::Revealed(Box::new(ConfiguredSparseTrie::from(*t)))
|
||||
}
|
||||
};
|
||||
accounts_trie.lock().replace(trie.clear());
|
||||
});
|
||||
}
|
||||
|
||||
@@ -374,9 +368,7 @@ where
|
||||
configured_trie: ConfiguredSparseTrie,
|
||||
sparse_trie_rx: mpsc::Receiver<SparseTrieUpdate>,
|
||||
proof_task_handle: BPF,
|
||||
state_root_tx: mpsc::Sender<
|
||||
Result<StateRootComputeOutcome<ConfiguredSparseTrie>, ParallelStateRootError>,
|
||||
>,
|
||||
state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
|
||||
is_revealed: bool,
|
||||
) where
|
||||
BPF: BlindedProviderFactory + Clone + Send + Sync + 'static,
|
||||
@@ -411,18 +403,16 @@ where
|
||||
&self,
|
||||
sparse_trie_rx: mpsc::Receiver<SparseTrieUpdate>,
|
||||
proof_task_handle: BPF,
|
||||
state_root_tx: mpsc::Sender<
|
||||
Result<StateRootComputeOutcome<ConfiguredSparseTrie>, ParallelStateRootError>,
|
||||
>,
|
||||
stored_trie: Option<SparseTrie<ConfiguredSparseTrie>>,
|
||||
state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
|
||||
stored_accounts_trie: Option<SparseTrie<ConfiguredSparseTrie>>,
|
||||
use_parallel_for_new: bool,
|
||||
) where
|
||||
BPF: BlindedProviderFactory + Clone + Send + Sync + 'static,
|
||||
BPF::AccountNodeProvider: BlindedProvider + Send + Sync,
|
||||
BPF::StorageNodeProvider: BlindedProvider + Send + Sync,
|
||||
{
|
||||
let is_revealed = stored_trie.as_ref().is_some_and(|trie| trie.is_revealed());
|
||||
match stored_trie {
|
||||
let is_revealed = stored_accounts_trie.as_ref().is_some_and(|trie| trie.is_revealed());
|
||||
match stored_accounts_trie {
|
||||
Some(SparseTrie::Revealed(boxed) | SparseTrie::Blind(Some(boxed))) => {
|
||||
self.dispatch_trie_spawn(
|
||||
*boxed,
|
||||
@@ -462,11 +452,7 @@ pub struct PayloadHandle {
|
||||
// must include the receiver of the state root wired to the sparse trie
|
||||
prewarm_handle: CacheTaskHandle,
|
||||
/// Receiver for the state root
|
||||
state_root: Option<
|
||||
mpsc::Receiver<
|
||||
Result<StateRootComputeOutcome<ConfiguredSparseTrie>, ParallelStateRootError>,
|
||||
>,
|
||||
>,
|
||||
state_root: Option<mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>>,
|
||||
}
|
||||
|
||||
impl PayloadHandle {
|
||||
@@ -475,9 +461,7 @@ impl PayloadHandle {
|
||||
/// # Panics
|
||||
///
|
||||
/// If payload processing was started without background tasks.
|
||||
pub fn state_root(
|
||||
&mut self,
|
||||
) -> Result<StateRootComputeOutcome<ConfiguredSparseTrie>, ParallelStateRootError> {
|
||||
pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
|
||||
self.state_root
|
||||
.take()
|
||||
.expect("state_root is None")
|
||||
|
||||
@@ -108,7 +108,26 @@ where
|
||||
///
|
||||
/// NOTE: This function does not take `self` by value to prevent blocking on [`SparseStateTrie`]
|
||||
/// drop.
|
||||
pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome<A>, ParallelStateRootError> {
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// - State root computation outcome.
|
||||
/// - Accounts trie that needs to be cleared and re-used to avoid reallocations.
|
||||
pub(super) fn run(
|
||||
&mut self,
|
||||
) -> (Result<StateRootComputeOutcome, ParallelStateRootError>, SparseTrie<A>) {
|
||||
// run the main loop to completion
|
||||
let result = self.run_inner();
|
||||
// take the account trie so that we can reuse its already allocated data structures.
|
||||
let trie = self.trie.take_accounts_trie();
|
||||
|
||||
(result, trie)
|
||||
}
|
||||
|
||||
/// Inner function to run the sparse trie task to completion.
|
||||
///
|
||||
/// See [`Self::run`] for more information.
|
||||
fn run_inner(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
|
||||
let now = Instant::now();
|
||||
|
||||
let mut num_iterations = 0;
|
||||
@@ -151,23 +170,18 @@ where
|
||||
self.metrics.sparse_trie_final_update_duration_histogram.record(start.elapsed());
|
||||
self.metrics.sparse_trie_total_duration_histogram.record(now.elapsed());
|
||||
|
||||
// take the account trie so that we can reuse its already allocated data structures.
|
||||
let trie = self.trie.take_cleared_accounts_trie();
|
||||
|
||||
Ok(StateRootComputeOutcome { state_root, trie_updates, trie })
|
||||
Ok(StateRootComputeOutcome { state_root, trie_updates })
|
||||
}
|
||||
}
|
||||
|
||||
/// Outcome of the state root computation, including the state root itself with
|
||||
/// the trie updates.
|
||||
#[derive(Debug)]
|
||||
pub struct StateRootComputeOutcome<A = SerialSparseTrie> {
|
||||
pub struct StateRootComputeOutcome {
|
||||
/// The state root.
|
||||
pub state_root: B256,
|
||||
/// The trie updates.
|
||||
pub trie_updates: TrieUpdates,
|
||||
/// The account state trie.
|
||||
pub trie: SparseTrie<A>,
|
||||
}
|
||||
|
||||
/// Updates the sparse trie with the given proofs and state, and returns the elapsed time.
|
||||
|
||||
@@ -93,9 +93,9 @@ where
|
||||
self
|
||||
}
|
||||
|
||||
/// Takes the `SparseTrie` from within the state root and clears it if it is not blinded.
|
||||
pub fn take_cleared_accounts_trie(&mut self) -> SparseTrie<A> {
|
||||
core::mem::take(&mut self.state).clear()
|
||||
/// Takes the accounts trie.
|
||||
pub fn take_accounts_trie(&mut self) -> SparseTrie<A> {
|
||||
core::mem::take(&mut self.state)
|
||||
}
|
||||
|
||||
/// Returns `true` if account was already revealed.
|
||||
|
||||
Reference in New Issue
Block a user