mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-08 03:01:12 -04:00
perf(trie): Re-use storage tries across payloads (#17488)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
@@ -34,6 +34,12 @@ impl From<ParallelSparseTrie> for ConfiguredSparseTrie {
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for ConfiguredSparseTrie {
|
||||
fn default() -> Self {
|
||||
Self::Serial(Default::default())
|
||||
}
|
||||
}
|
||||
|
||||
impl SparseTrieInterface for ConfiguredSparseTrie {
|
||||
fn with_root(
|
||||
self,
|
||||
|
||||
@@ -14,7 +14,7 @@ use alloy_evm::block::StateChangeSource;
|
||||
use alloy_primitives::B256;
|
||||
use executor::WorkloadExecutor;
|
||||
use multiproof::{SparseTrieUpdate, *};
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use parking_lot::RwLock;
|
||||
use prewarm::PrewarmMetrics;
|
||||
use reth_evm::{ConfigureEvm, OnStateHook, SpecFor};
|
||||
use reth_primitives_traits::{NodePrimitives, SealedHeaderFor};
|
||||
@@ -30,9 +30,8 @@ use reth_trie_parallel::{
|
||||
};
|
||||
use reth_trie_sparse::{
|
||||
provider::{TrieNodeProvider, TrieNodeProviderFactory},
|
||||
SerialSparseTrie, SparseTrie, SparseTrieInterface,
|
||||
ClearedSparseStateTrie, SerialSparseTrie, SparseStateTrie, SparseTrie,
|
||||
};
|
||||
use reth_trie_sparse_parallel::ParallelSparseTrie;
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
sync::{
|
||||
@@ -75,9 +74,11 @@ where
|
||||
precompile_cache_disabled: bool,
|
||||
/// Precompile cache map.
|
||||
precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
|
||||
/// A cleared accounts sparse trie, kept around to be reused for the state root computation so
|
||||
/// A cleared `SparseStateTrie`, kept around to be reused for the state root computation so
|
||||
/// that allocations can be minimized.
|
||||
accounts_trie: Arc<Mutex<Option<SparseTrie<ConfiguredSparseTrie>>>>,
|
||||
sparse_state_trie: Arc<
|
||||
parking_lot::Mutex<Option<ClearedSparseStateTrie<ConfiguredSparseTrie, SerialSparseTrie>>>,
|
||||
>,
|
||||
/// Whether to use the parallel sparse trie.
|
||||
use_parallel_sparse_trie: bool,
|
||||
_marker: std::marker::PhantomData<N>,
|
||||
@@ -104,7 +105,7 @@ where
|
||||
evm_config,
|
||||
precompile_cache_disabled: config.precompile_cache_disabled(),
|
||||
precompile_cache_map,
|
||||
accounts_trie: Arc::default(),
|
||||
sparse_state_trie: Arc::default(),
|
||||
use_parallel_sparse_trie: config.enable_parallel_sparse_trie(),
|
||||
_marker: Default::default(),
|
||||
}
|
||||
@@ -209,17 +210,8 @@ where
|
||||
// wire the sparse trie to the state root response receiver
|
||||
let (state_root_tx, state_root_rx) = channel();
|
||||
|
||||
// Take the stored accounts trie
|
||||
let stored_accounts_trie = self.accounts_trie.lock().take();
|
||||
|
||||
// Spawn the sparse trie task using any stored trie and parallel trie configuration.
|
||||
self.spawn_sparse_trie_task(
|
||||
sparse_trie_rx,
|
||||
proof_task.handle(),
|
||||
state_root_tx,
|
||||
stored_accounts_trie,
|
||||
self.use_parallel_sparse_trie,
|
||||
);
|
||||
self.spawn_sparse_trie_task(sparse_trie_rx, proof_task.handle(), state_root_tx);
|
||||
|
||||
// spawn the proof task
|
||||
self.executor.spawn_blocking(move || {
|
||||
@@ -320,127 +312,51 @@ where
|
||||
})
|
||||
}
|
||||
|
||||
/// Generic function to spawn a sparse trie task for any trie type that can be converted to
|
||||
/// `ConfiguredSparseTrie`.
|
||||
fn spawn_trie_task<BPF, A>(
|
||||
&self,
|
||||
sparse_trie_rx: mpsc::Receiver<SparseTrieUpdate>,
|
||||
proof_task_handle: BPF,
|
||||
state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
|
||||
sparse_trie: Option<SparseTrie<A>>,
|
||||
) where
|
||||
BPF: TrieNodeProviderFactory + Clone + Send + Sync + 'static,
|
||||
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
|
||||
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
|
||||
A: SparseTrieInterface + Send + Sync + Default + 'static,
|
||||
ConfiguredSparseTrie: From<A>,
|
||||
{
|
||||
let mut task = SparseTrieTask::<_, A, SerialSparseTrie>::new_with_stored_trie(
|
||||
self.executor.clone(),
|
||||
sparse_trie_rx,
|
||||
proof_task_handle,
|
||||
self.trie_metrics.clone(),
|
||||
sparse_trie,
|
||||
);
|
||||
|
||||
let accounts_trie = Arc::clone(&self.accounts_trie);
|
||||
self.executor.spawn_blocking(move || {
|
||||
let (result, trie) = task.run();
|
||||
// Send state root computation result
|
||||
let _ = state_root_tx.send(result);
|
||||
|
||||
// Clear and return accounts trie back to the payload processor
|
||||
let trie = match trie {
|
||||
SparseTrie::Blind(opt) => {
|
||||
SparseTrie::Blind(opt.map(|t| Box::new(ConfiguredSparseTrie::from(*t))))
|
||||
}
|
||||
SparseTrie::Revealed(t) => {
|
||||
SparseTrie::Revealed(Box::new(ConfiguredSparseTrie::from(*t)))
|
||||
}
|
||||
};
|
||||
accounts_trie.lock().replace(trie.clear());
|
||||
});
|
||||
}
|
||||
|
||||
/// Helper to dispatch trie spawn based on the `ConfiguredSparseTrie` variant
|
||||
fn dispatch_trie_spawn<BPF>(
|
||||
&self,
|
||||
configured_trie: ConfiguredSparseTrie,
|
||||
sparse_trie_rx: mpsc::Receiver<SparseTrieUpdate>,
|
||||
proof_task_handle: BPF,
|
||||
state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
|
||||
is_revealed: bool,
|
||||
) where
|
||||
BPF: TrieNodeProviderFactory + Clone + Send + Sync + 'static,
|
||||
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
|
||||
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
|
||||
{
|
||||
match configured_trie {
|
||||
ConfiguredSparseTrie::Serial(boxed_serial) => {
|
||||
let trie = if is_revealed {
|
||||
Some(SparseTrie::Revealed(boxed_serial))
|
||||
} else {
|
||||
Some(SparseTrie::Blind(Some(boxed_serial)))
|
||||
};
|
||||
self.spawn_trie_task(sparse_trie_rx, proof_task_handle, state_root_tx, trie);
|
||||
}
|
||||
ConfiguredSparseTrie::Parallel(boxed_parallel) => {
|
||||
let trie = if is_revealed {
|
||||
Some(SparseTrie::Revealed(boxed_parallel))
|
||||
} else {
|
||||
Some(SparseTrie::Blind(Some(boxed_parallel)))
|
||||
};
|
||||
self.spawn_trie_task(sparse_trie_rx, proof_task_handle, state_root_tx, trie);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper method that handles sparse trie task spawning.
|
||||
///
|
||||
/// If we have a stored trie, we will reuse it for spawning. If we do not have a stored trie,
|
||||
/// we will create a new trie based on the configured trie type (parallel or serial).
|
||||
/// Spawns the [`SparseTrieTask`] for this payload processor.
|
||||
fn spawn_sparse_trie_task<BPF>(
|
||||
&self,
|
||||
sparse_trie_rx: mpsc::Receiver<SparseTrieUpdate>,
|
||||
proof_task_handle: BPF,
|
||||
state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
|
||||
stored_accounts_trie: Option<SparseTrie<ConfiguredSparseTrie>>,
|
||||
use_parallel_for_new: bool,
|
||||
) where
|
||||
BPF: TrieNodeProviderFactory + Clone + Send + Sync + 'static,
|
||||
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
|
||||
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
|
||||
{
|
||||
let is_revealed = stored_accounts_trie.as_ref().is_some_and(|trie| trie.is_revealed());
|
||||
match stored_accounts_trie {
|
||||
Some(SparseTrie::Revealed(boxed) | SparseTrie::Blind(Some(boxed))) => {
|
||||
self.dispatch_trie_spawn(
|
||||
*boxed,
|
||||
sparse_trie_rx,
|
||||
proof_task_handle,
|
||||
state_root_tx,
|
||||
is_revealed,
|
||||
);
|
||||
}
|
||||
_ => {
|
||||
// No stored trie, create new based on config
|
||||
if use_parallel_for_new {
|
||||
self.spawn_trie_task::<_, ParallelSparseTrie>(
|
||||
sparse_trie_rx,
|
||||
proof_task_handle,
|
||||
state_root_tx,
|
||||
None,
|
||||
);
|
||||
} else {
|
||||
self.spawn_trie_task::<_, SerialSparseTrie>(
|
||||
sparse_trie_rx,
|
||||
proof_task_handle,
|
||||
state_root_tx,
|
||||
None,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Reuse a stored SparseStateTrie, or create a new one using the desired configuration if
|
||||
// there's none to reuse.
|
||||
let cleared_sparse_trie = Arc::clone(&self.sparse_state_trie);
|
||||
let sparse_state_trie = cleared_sparse_trie.lock().take().unwrap_or_else(|| {
|
||||
let accounts_trie = if self.use_parallel_sparse_trie {
|
||||
ConfiguredSparseTrie::Parallel(Default::default())
|
||||
} else {
|
||||
ConfiguredSparseTrie::Serial(Default::default())
|
||||
};
|
||||
ClearedSparseStateTrie::from_state_trie(
|
||||
SparseStateTrie::new()
|
||||
.with_accounts_trie(SparseTrie::Blind(Some(Box::new(accounts_trie))))
|
||||
.with_updates(true),
|
||||
)
|
||||
});
|
||||
|
||||
let task =
|
||||
SparseTrieTask::<_, ConfiguredSparseTrie, SerialSparseTrie>::new_with_cleared_trie(
|
||||
self.executor.clone(),
|
||||
sparse_trie_rx,
|
||||
proof_task_handle,
|
||||
self.trie_metrics.clone(),
|
||||
sparse_state_trie,
|
||||
);
|
||||
|
||||
self.executor.spawn_blocking(move || {
|
||||
let (result, trie) = task.run();
|
||||
// Send state root computation result
|
||||
let _ = state_root_tx.send(result);
|
||||
|
||||
// Clear the SparseStateTrie and replace it back into the mutex _after_ sending results
|
||||
// to the next step, so that time spent clearing doesn't block the step after this one.
|
||||
cleared_sparse_trie.lock().replace(ClearedSparseStateTrie::from_state_trie(trie));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@ use reth_trie_parallel::root::ParallelStateRootError;
|
||||
use reth_trie_sparse::{
|
||||
errors::{SparseStateTrieResult, SparseTrieErrorKind},
|
||||
provider::{TrieNodeProvider, TrieNodeProviderFactory},
|
||||
SerialSparseTrie, SparseStateTrie, SparseTrie, SparseTrieInterface,
|
||||
ClearedSparseStateTrie, SerialSparseTrie, SparseStateTrie, SparseTrieInterface,
|
||||
};
|
||||
use std::{
|
||||
sync::mpsc,
|
||||
@@ -31,9 +31,7 @@ where
|
||||
pub(super) executor: WorkloadExecutor,
|
||||
/// Receives updates from the state root task.
|
||||
pub(super) updates: mpsc::Receiver<SparseTrieUpdate>,
|
||||
/// Sparse Trie initialized with the blinded provider factory.
|
||||
///
|
||||
/// It's kept as a field on the struct to prevent blocking on de-allocation in [`Self::run`].
|
||||
/// `SparseStateTrie` used for computing the state root.
|
||||
pub(super) trie: SparseStateTrie<A, S>,
|
||||
pub(super) metrics: MultiProofTaskMetrics,
|
||||
/// Trie node provider factory.
|
||||
@@ -48,80 +46,39 @@ where
|
||||
A: SparseTrieInterface + Send + Sync + Default,
|
||||
S: SparseTrieInterface + Send + Sync + Default,
|
||||
{
|
||||
/// Creates a new sparse trie task.
|
||||
pub(super) fn new(
|
||||
/// Creates a new sparse trie, pre-populating with a [`ClearedSparseStateTrie`].
|
||||
pub(super) fn new_with_cleared_trie(
|
||||
executor: WorkloadExecutor,
|
||||
updates: mpsc::Receiver<SparseTrieUpdate>,
|
||||
blinded_provider_factory: BPF,
|
||||
metrics: MultiProofTaskMetrics,
|
||||
sparse_state_trie: ClearedSparseStateTrie<A, S>,
|
||||
) -> Self {
|
||||
Self {
|
||||
executor,
|
||||
updates,
|
||||
metrics,
|
||||
trie: SparseStateTrie::new().with_updates(true),
|
||||
trie: sparse_state_trie.into_inner(),
|
||||
blinded_provider_factory,
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new sparse trie, populating the accounts trie with the given `SparseTrie`, if it
|
||||
/// exists.
|
||||
pub(super) fn new_with_stored_trie(
|
||||
executor: WorkloadExecutor,
|
||||
updates: mpsc::Receiver<SparseTrieUpdate>,
|
||||
blinded_provider_factory: BPF,
|
||||
trie_metrics: MultiProofTaskMetrics,
|
||||
sparse_trie: Option<SparseTrie<A>>,
|
||||
) -> Self {
|
||||
if let Some(sparse_trie) = sparse_trie {
|
||||
Self::with_accounts_trie(
|
||||
executor,
|
||||
updates,
|
||||
blinded_provider_factory,
|
||||
trie_metrics,
|
||||
sparse_trie,
|
||||
)
|
||||
} else {
|
||||
Self::new(executor, updates, blinded_provider_factory, trie_metrics)
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new sparse trie task, using the given [`SparseTrie::Blind`] for the accounts
|
||||
/// trie.
|
||||
pub(super) fn with_accounts_trie(
|
||||
executor: WorkloadExecutor,
|
||||
updates: mpsc::Receiver<SparseTrieUpdate>,
|
||||
blinded_provider_factory: BPF,
|
||||
metrics: MultiProofTaskMetrics,
|
||||
sparse_trie: SparseTrie<A>,
|
||||
) -> Self {
|
||||
debug_assert!(sparse_trie.is_blind());
|
||||
let trie = SparseStateTrie::new().with_updates(true).with_accounts_trie(sparse_trie);
|
||||
Self { executor, updates, metrics, trie, blinded_provider_factory }
|
||||
}
|
||||
|
||||
/// Runs the sparse trie task to completion.
|
||||
///
|
||||
/// This waits for new incoming [`SparseTrieUpdate`].
|
||||
///
|
||||
/// This concludes once the last trie update has been received.
|
||||
///
|
||||
/// NOTE: This function does not take `self` by value to prevent blocking on [`SparseStateTrie`]
|
||||
/// drop.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// - State root computation outcome.
|
||||
/// - Accounts trie that needs to be cleared and reused to avoid reallocations.
|
||||
/// - `SparseStateTrie` that needs to be cleared and reused to avoid reallocations.
|
||||
pub(super) fn run(
|
||||
&mut self,
|
||||
) -> (Result<StateRootComputeOutcome, ParallelStateRootError>, SparseTrie<A>) {
|
||||
mut self,
|
||||
) -> (Result<StateRootComputeOutcome, ParallelStateRootError>, SparseStateTrie<A, S>) {
|
||||
// run the main loop to completion
|
||||
let result = self.run_inner();
|
||||
// take the account trie so that we can reuse its already allocated data structures.
|
||||
let trie = self.trie.take_accounts_trie();
|
||||
|
||||
(result, trie)
|
||||
(result, self.trie)
|
||||
}
|
||||
|
||||
/// Inner function to run the sparse trie task to completion.
|
||||
|
||||
@@ -20,6 +20,36 @@ use reth_trie_common::{
|
||||
};
|
||||
use tracing::trace;
|
||||
|
||||
/// Provides type-safe re-use of cleared [`SparseStateTrie`]s, which helps to save allocations
|
||||
/// across payload runs.
|
||||
#[derive(Debug)]
|
||||
pub struct ClearedSparseStateTrie<
|
||||
A = SerialSparseTrie, // Account trie implementation
|
||||
S = SerialSparseTrie, // Storage trie implementation
|
||||
>(SparseStateTrie<A, S>);
|
||||
|
||||
impl<A, S> ClearedSparseStateTrie<A, S>
|
||||
where
|
||||
A: SparseTrieInterface + Default,
|
||||
S: SparseTrieInterface + Default,
|
||||
{
|
||||
/// Creates a [`ClearedSparseStateTrie`] by clearing all the existing internal state of a
|
||||
/// [`SparseStateTrie`] and then storing that instance for later re-use.
|
||||
pub fn from_state_trie(mut trie: SparseStateTrie<A, S>) -> Self {
|
||||
trie.state = trie.state.clear();
|
||||
trie.cleared_storages.extend(trie.storages.drain().map(|(_, trie)| trie.clear()));
|
||||
trie.revealed_account_paths.clear();
|
||||
trie.revealed_storage_paths.clear();
|
||||
trie.account_rlp_buf.clear();
|
||||
Self(trie)
|
||||
}
|
||||
|
||||
/// Returns the cleared [`SparseStateTrie`], consuming this instance.
|
||||
pub fn into_inner(self) -> SparseStateTrie<A, S> {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Sparse state trie representing lazy-loaded Ethereum state trie.
|
||||
pub struct SparseStateTrie<
|
||||
@@ -30,6 +60,8 @@ pub struct SparseStateTrie<
|
||||
state: SparseTrie<A>,
|
||||
/// Sparse storage tries.
|
||||
storages: B256Map<SparseTrie<S>>,
|
||||
/// Cleared storage tries, kept for re-use
|
||||
cleared_storages: Vec<SparseTrie<S>>,
|
||||
/// Collection of revealed account trie paths.
|
||||
revealed_account_paths: HashSet<Nibbles>,
|
||||
/// Collection of revealed storage trie paths, per account.
|
||||
@@ -52,6 +84,7 @@ where
|
||||
Self {
|
||||
state: Default::default(),
|
||||
storages: Default::default(),
|
||||
cleared_storages: Default::default(),
|
||||
revealed_account_paths: Default::default(),
|
||||
revealed_storage_paths: Default::default(),
|
||||
retain_updates: false,
|
||||
@@ -70,16 +103,7 @@ impl SparseStateTrie {
|
||||
}
|
||||
}
|
||||
|
||||
impl<A, S> SparseStateTrie<A, S>
|
||||
where
|
||||
A: SparseTrieInterface + Default,
|
||||
S: SparseTrieInterface + Default,
|
||||
{
|
||||
/// Create new [`SparseStateTrie`]
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
impl<A, S> SparseStateTrie<A, S> {
|
||||
/// Set the retention of branch node updates and deletions.
|
||||
pub const fn with_updates(mut self, retain_updates: bool) -> Self {
|
||||
self.retain_updates = retain_updates;
|
||||
@@ -91,10 +115,16 @@ where
|
||||
self.state = trie;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
/// Takes the accounts trie.
|
||||
pub fn take_accounts_trie(&mut self) -> SparseTrie<A> {
|
||||
core::mem::take(&mut self.state)
|
||||
impl<A, S> SparseStateTrie<A, S>
|
||||
where
|
||||
A: SparseTrieInterface + Default,
|
||||
S: SparseTrieInterface + Default,
|
||||
{
|
||||
/// Create new [`SparseStateTrie`]
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
/// Returns `true` if account was already revealed.
|
||||
@@ -166,6 +196,16 @@ where
|
||||
self.storages.insert(address, storage_trie);
|
||||
}
|
||||
|
||||
/// Retrieves the storage trie for the given address, creating a new one if it doesn't exist.
|
||||
///
|
||||
/// This method should always be used to create a storage trie, as it will re-use previously
|
||||
/// allocated and cleared storage tries when possible.
|
||||
fn get_or_create_storage_trie(&mut self, address: B256) -> &mut SparseTrie<S> {
|
||||
self.storages
|
||||
.entry(address)
|
||||
.or_insert_with(|| self.cleared_storages.pop().unwrap_or_default())
|
||||
}
|
||||
|
||||
/// Reveal unknown trie paths from multiproof.
|
||||
/// NOTE: This method does not extensively validate the proof.
|
||||
pub fn reveal_multiproof(&mut self, multiproof: MultiProof) -> SparseStateTrieResult<()> {
|
||||
@@ -302,10 +342,11 @@ where
|
||||
if let Some(root_node) = root_node {
|
||||
// Reveal root node if it wasn't already.
|
||||
trace!(target: "trie::sparse", ?account, ?root_node, "Revealing root storage node");
|
||||
let trie = self.storages.entry(account).or_default().reveal_root(
|
||||
let retain_updates = self.retain_updates;
|
||||
let trie = self.get_or_create_storage_trie(account).reveal_root(
|
||||
root_node.node,
|
||||
root_node.masks,
|
||||
self.retain_updates,
|
||||
retain_updates,
|
||||
)?;
|
||||
|
||||
// Reserve the capacity for new nodes ahead of time, if the trie implementation
|
||||
@@ -380,13 +421,14 @@ where
|
||||
.get(&account)
|
||||
.is_none_or(|paths| !paths.contains(&path))
|
||||
{
|
||||
let storage_trie_entry = self.storages.entry(account).or_default();
|
||||
let retain_updates = self.retain_updates;
|
||||
let storage_trie_entry = self.get_or_create_storage_trie(account);
|
||||
if path.is_empty() {
|
||||
// Handle special storage state root node case.
|
||||
storage_trie_entry.reveal_root(
|
||||
trie_node,
|
||||
TrieMasks::none(),
|
||||
self.retain_updates,
|
||||
retain_updates,
|
||||
)?;
|
||||
} else {
|
||||
// Reveal non-root storage trie node.
|
||||
|
||||
Reference in New Issue
Block a user