mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
perf: better scheduling for storage roots computation (#21987)
Co-authored-by: Brian Picciano <me@mediocregopher.com> Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
@@ -11,7 +11,7 @@ use crate::tree::{
|
||||
use alloy_primitives::B256;
|
||||
use alloy_rlp::{Decodable, Encodable};
|
||||
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
|
||||
use rayon::iter::{IntoParallelRefMutIterator, ParallelIterator};
|
||||
use rayon::iter::ParallelIterator;
|
||||
use reth_primitives_traits::{Account, ParallelBridgeBuffered};
|
||||
use reth_trie::{
|
||||
proof_v2::Target, updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, Nibbles,
|
||||
@@ -675,6 +675,11 @@ where
|
||||
/// Invokes `update_leaves` for the accounts trie and collects any new targets.
|
||||
///
|
||||
/// Returns whether any updates were drained (applied to the trie).
|
||||
#[instrument(
|
||||
level = "debug",
|
||||
target = "engine::tree::payload_processor::sparse_trie",
|
||||
skip_all
|
||||
)]
|
||||
fn process_account_leaf_updates(&mut self, new: bool) -> SparseTrieResult<bool> {
|
||||
let account_updates =
|
||||
if new { &mut self.new_account_updates } else { &mut self.account_updates };
|
||||
@@ -718,53 +723,50 @@ where
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let span = tracing::Span::current();
|
||||
let roots = self
|
||||
let span = debug_span!("compute_storage_roots").entered();
|
||||
self
|
||||
.trie
|
||||
.storage_tries_mut()
|
||||
.par_iter_mut()
|
||||
.filter(|(address, _)| {
|
||||
self.storage_updates.get(*address).is_some_and(|updates| updates.is_empty())
|
||||
.iter_mut()
|
||||
.filter(|(address, trie)| {
|
||||
self.storage_updates.get(*address).is_some_and(|updates| updates.is_empty()) &&
|
||||
!trie.is_root_cached()
|
||||
})
|
||||
.map(|(address, trie)| {
|
||||
.par_bridge_buffered()
|
||||
.for_each(|(address, trie)| {
|
||||
let _enter = debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage root", ?address).entered();
|
||||
let root =
|
||||
trie.root().expect("updates are drained, trie should be revealed by now");
|
||||
|
||||
(address, root)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for (addr, storage_root) in roots {
|
||||
// If the storage root is known and we have a pending update for this account, encode it
|
||||
// into a proper update.
|
||||
if let Entry::Occupied(entry) = self.pending_account_updates.entry(*addr) &&
|
||||
entry.get().is_some()
|
||||
{
|
||||
let account = entry.remove().expect("just checked, should be Some");
|
||||
let encoded = if account.is_none_or(|account| account.is_empty()) &&
|
||||
storage_root == EMPTY_ROOT_HASH
|
||||
{
|
||||
Vec::new()
|
||||
} else {
|
||||
self.account_rlp_buf.clear();
|
||||
account
|
||||
.unwrap_or_default()
|
||||
.into_trie_account(storage_root)
|
||||
.encode(&mut self.account_rlp_buf);
|
||||
self.account_rlp_buf.clone()
|
||||
};
|
||||
self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
|
||||
}
|
||||
}
|
||||
trie.root().expect("updates are drained, trie should be revealed by now");
|
||||
});
|
||||
drop(span);
|
||||
|
||||
loop {
|
||||
let span = debug_span!("promote_updates", promoted = tracing::field::Empty).entered();
|
||||
// Now handle pending account updates that can be upgraded to a proper update.
|
||||
let account_rlp_buf = &mut self.account_rlp_buf;
|
||||
let mut num_promoted = 0;
|
||||
self.pending_account_updates.retain(|addr, account| {
|
||||
// If account has pending storage updates, it is still pending.
|
||||
if self.storage_updates.get(addr).is_some_and(|updates| !updates.is_empty()) {
|
||||
return true;
|
||||
if let Some(updates) = self.storage_updates.get(addr) {
|
||||
if !updates.is_empty() {
|
||||
// If account has pending storage updates, it is still pending.
|
||||
return true;
|
||||
} else if let Some(account) = account.take() {
|
||||
let storage_root = self.trie.storage_root(addr).expect("updates are drained, storage trie should be revealed by now");
|
||||
let encoded = if account.is_none_or(|account| account.is_empty()) &&
|
||||
storage_root == EMPTY_ROOT_HASH
|
||||
{
|
||||
Vec::new()
|
||||
} else {
|
||||
account_rlp_buf.clear();
|
||||
account
|
||||
.unwrap_or_default()
|
||||
.into_trie_account(storage_root)
|
||||
.encode(account_rlp_buf);
|
||||
account_rlp_buf.clone()
|
||||
};
|
||||
self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
|
||||
num_promoted += 1;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// Get the current account state either from the trie or from latest account update.
|
||||
@@ -799,15 +801,18 @@ where
|
||||
account_rlp_buf.clone()
|
||||
};
|
||||
self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
|
||||
num_promoted += 1;
|
||||
|
||||
false
|
||||
});
|
||||
span.record("promoted", num_promoted);
|
||||
drop(span);
|
||||
|
||||
// Only exit when no new updates are processed.
|
||||
//
|
||||
// We need to keep iterating if any updates are being drained because that might
|
||||
// indicate that more pending account updates can be promoted.
|
||||
if !self.process_account_leaf_updates(false)? {
|
||||
if num_promoted == 0 || !self.process_account_leaf_updates(false)? {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
@@ -898,6 +898,13 @@ impl SparseTrie for ParallelSparseTrie {
|
||||
fn root(&mut self) -> B256 {
|
||||
trace!(target: "trie::parallel_sparse", "Calculating trie root hash");
|
||||
|
||||
if self.prefix_set.is_empty() &&
|
||||
let Some(hash) =
|
||||
self.upper_subtrie.nodes.get(&Nibbles::default()).and_then(|node| node.hash())
|
||||
{
|
||||
return hash;
|
||||
}
|
||||
|
||||
// Update all lower subtrie hashes
|
||||
self.update_subtrie_hashes();
|
||||
|
||||
@@ -910,6 +917,14 @@ impl SparseTrie for ParallelSparseTrie {
|
||||
root_rlp.as_hash().unwrap_or(EMPTY_ROOT_HASH)
|
||||
}
|
||||
|
||||
fn is_root_cached(&self) -> bool {
|
||||
self.prefix_set.is_empty() &&
|
||||
self.upper_subtrie
|
||||
.nodes
|
||||
.get(&Nibbles::default())
|
||||
.is_some_and(|node| node.hash().is_some())
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", target = "trie::sparse::parallel", skip(self))]
|
||||
fn update_subtrie_hashes(&mut self) {
|
||||
trace!(target: "trie::parallel_sparse", "Updating subtrie hashes");
|
||||
|
||||
@@ -188,6 +188,9 @@ pub trait SparseTrie: Sized + Debug + Send + Sync {
|
||||
/// The root hash of the trie.
|
||||
fn root(&mut self) -> B256;
|
||||
|
||||
/// Returns true if the root node is cached and does not need any recomputation.
|
||||
fn is_root_cached(&self) -> bool;
|
||||
|
||||
/// Recalculates and updates the RLP hashes of subtries deeper than a certain level. The level
|
||||
/// is defined in the implementation.
|
||||
///
|
||||
|
||||
@@ -199,6 +199,11 @@ impl<T: SparseTrieTrait> RevealableSparseTrie<T> {
|
||||
Some(self.as_revealed_mut()?.root())
|
||||
}
|
||||
|
||||
/// Returns true if the root node is cached and does not need any recomputation.
|
||||
pub fn is_root_cached(&self) -> bool {
|
||||
self.as_revealed_ref().is_some_and(|trie| trie.is_root_cached())
|
||||
}
|
||||
|
||||
/// Returns the root hash along with any accumulated update information.
|
||||
///
|
||||
/// This is useful for when you need both the root hash and information about
|
||||
@@ -965,6 +970,11 @@ impl SparseTrieTrait for SerialSparseTrie {
|
||||
}
|
||||
}
|
||||
|
||||
fn is_root_cached(&self) -> bool {
|
||||
self.prefix_set.is_empty() &&
|
||||
self.nodes.get(&Nibbles::default()).is_some_and(|node| node.is_hash())
|
||||
}
|
||||
|
||||
fn update_subtrie_hashes(&mut self) {
|
||||
self.update_rlp_node_level(SPARSE_TRIE_SUBTRIE_HASHES_LEVEL);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user