From cdcea2bd334f8ac06f7479194d7abe71903781dc Mon Sep 17 00:00:00 2001 From: Arsenii Kulikov Date: Mon, 9 Feb 2026 22:10:45 +0400 Subject: [PATCH] perf: better scheduling for storage roots computation (#21987) Co-authored-by: Brian Picciano Co-authored-by: Matthias Seitz --- .../src/tree/payload_processor/sparse_trie.rs | 85 ++++++++++--------- crates/trie/sparse-parallel/src/trie.rs | 15 ++++ crates/trie/sparse/src/traits.rs | 3 + crates/trie/sparse/src/trie.rs | 10 +++ 4 files changed, 73 insertions(+), 40 deletions(-) diff --git a/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs b/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs index 8c37e296fe..0e378f778d 100644 --- a/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs +++ b/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs @@ -11,7 +11,7 @@ use crate::tree::{ use alloy_primitives::B256; use alloy_rlp::{Decodable, Encodable}; use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender}; -use rayon::iter::{IntoParallelRefMutIterator, ParallelIterator}; +use rayon::iter::ParallelIterator; use reth_primitives_traits::{Account, ParallelBridgeBuffered}; use reth_trie::{ proof_v2::Target, updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, Nibbles, @@ -675,6 +675,11 @@ where /// Invokes `update_leaves` for the accounts trie and collects any new targets. /// /// Returns whether any updates were drained (applied to the trie). + #[instrument( + level = "debug", + target = "engine::tree::payload_processor::sparse_trie", + skip_all + )] fn process_account_leaf_updates(&mut self, new: bool) -> SparseTrieResult { let account_updates = if new { &mut self.new_account_updates } else { &mut self.account_updates }; @@ -718,53 +723,50 @@ where return Ok(()); } - let span = tracing::Span::current(); - let roots = self + let span = debug_span!("compute_storage_roots").entered(); + self .trie .storage_tries_mut() - .par_iter_mut() - .filter(|(address, _)| { - self.storage_updates.get(*address).is_some_and(|updates| updates.is_empty()) + .iter_mut() + .filter(|(address, trie)| { + self.storage_updates.get(*address).is_some_and(|updates| updates.is_empty()) && + !trie.is_root_cached() }) - .map(|(address, trie)| { + .par_bridge_buffered() + .for_each(|(address, trie)| { let _enter = debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage root", ?address).entered(); - let root = - trie.root().expect("updates are drained, trie should be revealed by now"); - - (address, root) - }) - .collect::>(); - - for (addr, storage_root) in roots { - // If the storage root is known and we have a pending update for this account, encode it - // into a proper update. - if let Entry::Occupied(entry) = self.pending_account_updates.entry(*addr) && - entry.get().is_some() - { - let account = entry.remove().expect("just checked, should be Some"); - let encoded = if account.is_none_or(|account| account.is_empty()) && - storage_root == EMPTY_ROOT_HASH - { - Vec::new() - } else { - self.account_rlp_buf.clear(); - account - .unwrap_or_default() - .into_trie_account(storage_root) - .encode(&mut self.account_rlp_buf); - self.account_rlp_buf.clone() - }; - self.account_updates.insert(*addr, LeafUpdate::Changed(encoded)); - } - } + trie.root().expect("updates are drained, trie should be revealed by now"); + }); + drop(span); loop { + let span = debug_span!("promote_updates", promoted = tracing::field::Empty).entered(); // Now handle pending account updates that can be upgraded to a proper update. let account_rlp_buf = &mut self.account_rlp_buf; + let mut num_promoted = 0; self.pending_account_updates.retain(|addr, account| { - // If account has pending storage updates, it is still pending. - if self.storage_updates.get(addr).is_some_and(|updates| !updates.is_empty()) { - return true; + if let Some(updates) = self.storage_updates.get(addr) { + if !updates.is_empty() { + // If account has pending storage updates, it is still pending. + return true; + } else if let Some(account) = account.take() { + let storage_root = self.trie.storage_root(addr).expect("updates are drained, storage trie should be revealed by now"); + let encoded = if account.is_none_or(|account| account.is_empty()) && + storage_root == EMPTY_ROOT_HASH + { + Vec::new() + } else { + account_rlp_buf.clear(); + account + .unwrap_or_default() + .into_trie_account(storage_root) + .encode(account_rlp_buf); + account_rlp_buf.clone() + }; + self.account_updates.insert(*addr, LeafUpdate::Changed(encoded)); + num_promoted += 1; + return false; + } } // Get the current account state either from the trie or from latest account update. @@ -799,15 +801,18 @@ where account_rlp_buf.clone() }; self.account_updates.insert(*addr, LeafUpdate::Changed(encoded)); + num_promoted += 1; false }); + span.record("promoted", num_promoted); + drop(span); // Only exit when no new updates are processed. // // We need to keep iterating if any updates are being drained because that might // indicate that more pending account updates can be promoted. - if !self.process_account_leaf_updates(false)? { + if num_promoted == 0 || !self.process_account_leaf_updates(false)? { break } } diff --git a/crates/trie/sparse-parallel/src/trie.rs b/crates/trie/sparse-parallel/src/trie.rs index 499f58bebe..092f9afa4b 100644 --- a/crates/trie/sparse-parallel/src/trie.rs +++ b/crates/trie/sparse-parallel/src/trie.rs @@ -898,6 +898,13 @@ impl SparseTrie for ParallelSparseTrie { fn root(&mut self) -> B256 { trace!(target: "trie::parallel_sparse", "Calculating trie root hash"); + if self.prefix_set.is_empty() && + let Some(hash) = + self.upper_subtrie.nodes.get(&Nibbles::default()).and_then(|node| node.hash()) + { + return hash; + } + // Update all lower subtrie hashes self.update_subtrie_hashes(); @@ -910,6 +917,14 @@ impl SparseTrie for ParallelSparseTrie { root_rlp.as_hash().unwrap_or(EMPTY_ROOT_HASH) } + fn is_root_cached(&self) -> bool { + self.prefix_set.is_empty() && + self.upper_subtrie + .nodes + .get(&Nibbles::default()) + .is_some_and(|node| node.hash().is_some()) + } + #[instrument(level = "trace", target = "trie::sparse::parallel", skip(self))] fn update_subtrie_hashes(&mut self) { trace!(target: "trie::parallel_sparse", "Updating subtrie hashes"); diff --git a/crates/trie/sparse/src/traits.rs b/crates/trie/sparse/src/traits.rs index 915f38434a..7508a325da 100644 --- a/crates/trie/sparse/src/traits.rs +++ b/crates/trie/sparse/src/traits.rs @@ -188,6 +188,9 @@ pub trait SparseTrie: Sized + Debug + Send + Sync { /// The root hash of the trie. fn root(&mut self) -> B256; + /// Returns true if the root node is cached and does not need any recomputation. + fn is_root_cached(&self) -> bool; + /// Recalculates and updates the RLP hashes of subtries deeper than a certain level. The level /// is defined in the implementation. /// diff --git a/crates/trie/sparse/src/trie.rs b/crates/trie/sparse/src/trie.rs index 365bfdecb7..4d0d85e6f2 100644 --- a/crates/trie/sparse/src/trie.rs +++ b/crates/trie/sparse/src/trie.rs @@ -199,6 +199,11 @@ impl RevealableSparseTrie { Some(self.as_revealed_mut()?.root()) } + /// Returns true if the root node is cached and does not need any recomputation. + pub fn is_root_cached(&self) -> bool { + self.as_revealed_ref().is_some_and(|trie| trie.is_root_cached()) + } + /// Returns the root hash along with any accumulated update information. /// /// This is useful for when you need both the root hash and information about @@ -965,6 +970,11 @@ impl SparseTrieTrait for SerialSparseTrie { } } + fn is_root_cached(&self) -> bool { + self.prefix_set.is_empty() && + self.nodes.get(&Nibbles::default()).is_some_and(|node| node.is_hash()) + } + fn update_subtrie_hashes(&mut self) { self.update_rlp_node_level(SPARSE_TRIE_SUBTRIE_HASHES_LEVEL); }