Compare commits

...

8 Commits

Author SHA1 Message Date
Arsenii Kulikov
c9c34d4b7e fix 2026-02-06 04:17:03 +04:00
Arsenii Kulikov
3eb63bd2ef min_len 2026-02-06 03:36:12 +04:00
Arsenii Kulikov
1dc2f14897 Merge branch 'main' into klkvr/join-leaf-updates 2026-02-06 02:02:33 +04:00
Arsenii Kulikov
623193f87c wip 2026-02-06 01:58:46 +04:00
Arsenii Kulikov
8849f7db59 wip 2026-02-05 23:23:28 +04:00
Arsenii Kulikov
0136fcf859 wip 2026-02-05 22:02:51 +04:00
Arsenii Kulikov
bd449e8b91 swap 2026-02-05 21:52:04 +04:00
Arsenii Kulikov
f0171eff76 perf: join leaf updates 2026-02-05 19:57:04 +04:00
2 changed files with 155 additions and 40 deletions

View File

@@ -11,7 +11,9 @@ use crate::tree::{
use alloy_primitives::B256;
use alloy_rlp::{Decodable, Encodable};
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use rayon::iter::{IntoParallelRefMutIterator, ParallelIterator};
use rayon::iter::{
IndexedParallelIterator, IntoParallelIterator, IntoParallelRefMutIterator, ParallelIterator,
};
use reth_primitives_traits::{Account, ParallelBridgeBuffered};
use reth_trie::{
proof_v2::Target, updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, Nibbles,
@@ -620,51 +622,140 @@ where
fn process_leaf_updates(&mut self, new: bool) -> SparseTrieResult<()> {
let storage_updates =
if new { &mut self.new_storage_updates } else { &mut self.storage_updates };
let account_updates =
if new { &mut self.new_account_updates } else { &mut self.account_updates };
// Process all storage updates in parallel, skipping tries with no pending updates.
let storage_results = storage_updates
.iter_mut()
.filter(|(_, updates)| !updates.is_empty())
.map(|(address, updates)| {
let trie = self.trie.take_or_create_storage_trie(address);
let fetched = self.fetched_storage_targets.remove(address).unwrap_or_default();
let (account_trie, storage_tries) = self.trie.tries_mut();
(address, updates, fetched, trie)
})
.par_bridge_buffered()
.map(|(address, updates, mut fetched, mut trie)| {
let mut targets = Vec::new();
let num_account_updates = account_updates.len();
let num_storage_updates =
storage_updates.values().map(|updates| updates.len()).sum::<usize>();
trie.update_leaves(updates, |path, min_len| match fetched.entry(path) {
Entry::Occupied(mut entry) => {
if min_len < *entry.get() {
entry.insert(min_len);
targets.push(Target::new(path).with_min_len(min_len));
let parent_span = tracing::Span::current();
let (accounts_result, storages_result) = maybe_join(
num_storage_updates.min(num_account_updates) > 100,
|| {
let _updates_span =
debug_span!(parent: &parent_span, "process_storage_leaf_updates", num_updates = num_storage_updates)
.entered();
if num_storage_updates > 100 {
// Process all storage updates in parallel, skipping tries with no pending
// updates.
let storage_updates = storage_updates
.iter_mut()
.filter(|(_, updates)| !updates.is_empty())
.map(|(address, updates)| {
let trie = storage_tries.take_or_create_trie(address);
let fetched =
self.fetched_storage_targets.remove(address).unwrap_or_default();
(address, updates, fetched, trie)
})
.collect::<Vec<_>>();
let total_items = storage_updates.len();
let storage_results = storage_updates
.into_par_iter()
.with_min_len(total_items / 2)
.map(|(address, updates, mut fetched, mut trie)| {
let _span = debug_span!(parent: &_updates_span, "process_storage_leaf_updates", num_updates = updates.len()).entered();
let mut targets = Vec::new();
trie.update_leaves(updates, |path, min_len| match fetched.entry(path) {
Entry::Occupied(mut entry) => {
if min_len < *entry.get() {
entry.insert(min_len);
targets.push(Target::new(path).with_min_len(min_len));
}
}
Entry::Vacant(entry) => {
entry.insert(min_len);
targets.push(Target::new(path).with_min_len(min_len));
}
})?;
SparseTrieResult::Ok((address, targets, fetched, trie))
})
.collect::<Result<Vec<_>, _>>()?;
for (address, targets, fetched, trie) in storage_results {
self.fetched_storage_targets.insert(*address, fetched);
storage_tries.insert_trie(*address, trie);
if !targets.is_empty() {
self.pending_targets
.storage_targets
.entry(*address)
.or_default()
.extend(targets);
}
}
Entry::Vacant(entry) => {
entry.insert(min_len);
targets.push(Target::new(path).with_min_len(min_len));
} else {
for (address, updates) in storage_updates.iter_mut() {
if updates.is_empty() {
continue;
}
let _span = debug_span!(parent: &_updates_span, "process_storage_leaf_updates", num_updates = updates.len()).entered();
let trie = storage_tries.get_or_create_trie_mut(*address);
let fetched = self.fetched_storage_targets.entry(*address).or_default();
let mut targets = Vec::new();
trie.update_leaves(updates, |path, min_len| match fetched.entry(path) {
Entry::Occupied(mut entry) => {
if min_len < *entry.get() {
entry.insert(min_len);
targets.push(Target::new(path).with_min_len(min_len));
}
}
Entry::Vacant(entry) => {
entry.insert(min_len);
targets.push(Target::new(path).with_min_len(min_len));
}
})?;
if !targets.is_empty() {
self.pending_targets
.storage_targets
.entry(*address)
.or_default()
.extend(targets);
}
}
})?;
}
SparseTrieResult::Ok((address, targets, fetched, trie))
})
.collect::<Result<Vec<_>, _>>()?;
Ok(())
},
|| {
let _span =
debug_span!(parent: &parent_span, "process_account_leaf_updates", num_updates = num_account_updates)
.entered();
for (address, targets, fetched, trie) in storage_results {
self.fetched_storage_targets.insert(*address, fetched);
self.trie.insert_storage_trie(*address, trie);
account_trie.update_leaves(account_updates, |target, min_len| {
match self.fetched_account_targets.entry(target) {
Entry::Occupied(mut entry) => {
if min_len < *entry.get() {
entry.insert(min_len);
self.pending_targets
.account_targets
.push(Target::new(target).with_min_len(min_len));
}
}
Entry::Vacant(entry) => {
entry.insert(min_len);
self.pending_targets
.account_targets
.push(Target::new(target).with_min_len(min_len));
}
}
})
},
);
if !targets.is_empty() {
self.pending_targets.storage_targets.entry(*address).or_default().extend(targets);
}
}
// Process account trie updates and fill the account targets.
self.process_account_leaf_updates(new)?;
Ok(())
accounts_result.and(storages_result)
}
/// Invokes `update_leaves` for the accounts trie and collects any new targets.
@@ -1017,3 +1108,17 @@ where
Ok(elapsed)
}
fn maybe_join<A, B, RA, RB>(should_parallelize: bool, a: A, b: B) -> (RA, RB)
where
A: FnOnce() -> RA + Send,
B: FnOnce() -> RB + Send,
RA: Send,
RB: Send,
{
if should_parallelize {
rayon::join(a, b)
} else {
(a(), b())
}
}

View File

@@ -223,6 +223,11 @@ where
&mut self.storage.tries
}
/// Returns mutable references to both the account trie and storage tries.
pub const fn tries_mut(&mut self) -> (&mut RevealableSparseTrie<A>, &mut StorageTries<S>) {
(&mut self.state, &mut self.storage)
}
/// Takes the storage trie for the provided address.
pub fn take_storage_trie(&mut self, address: &B256) -> Option<RevealableSparseTrie<S>> {
self.storage.tries.remove(address)
@@ -1179,7 +1184,7 @@ where
/// of [`SparseStateTrie`] both to help enforce allocation re-use and to allow us to implement
/// methods like `get_trie_and_revealed_paths` which return multiple mutable borrows.
#[derive(Debug, Default)]
struct StorageTries<S = SerialSparseTrie> {
pub struct StorageTries<S = SerialSparseTrie> {
/// Sparse storage tries.
tries: B256Map<RevealableSparseTrie<S>>,
/// Cleared storage tries, kept for re-use.
@@ -1376,7 +1381,7 @@ impl<S: SparseTrieTrait + Clone> StorageTries<S> {
}
// Returns mutable reference to storage sparse trie, creating a blind one if it doesn't exist.
fn get_or_create_trie_mut(&mut self, address: B256) -> &mut RevealableSparseTrie<S> {
pub fn get_or_create_trie_mut(&mut self, address: B256) -> &mut RevealableSparseTrie<S> {
self.tries.entry(address).or_insert_with(|| {
self.cleared_tries.pop().unwrap_or_else(|| self.default_trie.clone())
})
@@ -1385,12 +1390,17 @@ impl<S: SparseTrieTrait + Clone> StorageTries<S> {
/// Takes the storage trie for the account from the internal `HashMap`, creating it if it
/// doesn't already exist.
#[cfg(feature = "std")]
fn take_or_create_trie(&mut self, account: &B256) -> RevealableSparseTrie<S> {
pub fn take_or_create_trie(&mut self, account: &B256) -> RevealableSparseTrie<S> {
self.tries.remove(account).unwrap_or_else(|| {
self.cleared_tries.pop().unwrap_or_else(|| self.default_trie.clone())
})
}
/// Inserts the storage trie for the account into the internal `HashMap`.
pub fn insert_trie(&mut self, account: B256, trie: RevealableSparseTrie<S>) {
self.tries.insert(account, trie);
}
/// Takes the revealed paths set from the account from the internal `HashMap`, creating one if
/// it doesn't exist.
#[cfg(feature = "std")]