mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
8 Commits
devnet4
...
klkvr/join
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c9c34d4b7e | ||
|
|
3eb63bd2ef | ||
|
|
1dc2f14897 | ||
|
|
623193f87c | ||
|
|
8849f7db59 | ||
|
|
0136fcf859 | ||
|
|
bd449e8b91 | ||
|
|
f0171eff76 |
@@ -11,7 +11,9 @@ use crate::tree::{
|
||||
use alloy_primitives::B256;
|
||||
use alloy_rlp::{Decodable, Encodable};
|
||||
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
|
||||
use rayon::iter::{IntoParallelRefMutIterator, ParallelIterator};
|
||||
use rayon::iter::{
|
||||
IndexedParallelIterator, IntoParallelIterator, IntoParallelRefMutIterator, ParallelIterator,
|
||||
};
|
||||
use reth_primitives_traits::{Account, ParallelBridgeBuffered};
|
||||
use reth_trie::{
|
||||
proof_v2::Target, updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, Nibbles,
|
||||
@@ -620,51 +622,140 @@ where
|
||||
fn process_leaf_updates(&mut self, new: bool) -> SparseTrieResult<()> {
|
||||
let storage_updates =
|
||||
if new { &mut self.new_storage_updates } else { &mut self.storage_updates };
|
||||
let account_updates =
|
||||
if new { &mut self.new_account_updates } else { &mut self.account_updates };
|
||||
|
||||
// Process all storage updates in parallel, skipping tries with no pending updates.
|
||||
let storage_results = storage_updates
|
||||
.iter_mut()
|
||||
.filter(|(_, updates)| !updates.is_empty())
|
||||
.map(|(address, updates)| {
|
||||
let trie = self.trie.take_or_create_storage_trie(address);
|
||||
let fetched = self.fetched_storage_targets.remove(address).unwrap_or_default();
|
||||
let (account_trie, storage_tries) = self.trie.tries_mut();
|
||||
|
||||
(address, updates, fetched, trie)
|
||||
})
|
||||
.par_bridge_buffered()
|
||||
.map(|(address, updates, mut fetched, mut trie)| {
|
||||
let mut targets = Vec::new();
|
||||
let num_account_updates = account_updates.len();
|
||||
let num_storage_updates =
|
||||
storage_updates.values().map(|updates| updates.len()).sum::<usize>();
|
||||
|
||||
trie.update_leaves(updates, |path, min_len| match fetched.entry(path) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
if min_len < *entry.get() {
|
||||
entry.insert(min_len);
|
||||
targets.push(Target::new(path).with_min_len(min_len));
|
||||
let parent_span = tracing::Span::current();
|
||||
let (accounts_result, storages_result) = maybe_join(
|
||||
num_storage_updates.min(num_account_updates) > 100,
|
||||
|| {
|
||||
let _updates_span =
|
||||
debug_span!(parent: &parent_span, "process_storage_leaf_updates", num_updates = num_storage_updates)
|
||||
.entered();
|
||||
|
||||
if num_storage_updates > 100 {
|
||||
// Process all storage updates in parallel, skipping tries with no pending
|
||||
// updates.
|
||||
let storage_updates = storage_updates
|
||||
.iter_mut()
|
||||
.filter(|(_, updates)| !updates.is_empty())
|
||||
.map(|(address, updates)| {
|
||||
let trie = storage_tries.take_or_create_trie(address);
|
||||
let fetched =
|
||||
self.fetched_storage_targets.remove(address).unwrap_or_default();
|
||||
|
||||
(address, updates, fetched, trie)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let total_items = storage_updates.len();
|
||||
|
||||
let storage_results = storage_updates
|
||||
.into_par_iter()
|
||||
.with_min_len(total_items / 2)
|
||||
.map(|(address, updates, mut fetched, mut trie)| {
|
||||
let _span = debug_span!(parent: &_updates_span, "process_storage_leaf_updates", num_updates = updates.len()).entered();
|
||||
|
||||
let mut targets = Vec::new();
|
||||
|
||||
trie.update_leaves(updates, |path, min_len| match fetched.entry(path) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
if min_len < *entry.get() {
|
||||
entry.insert(min_len);
|
||||
targets.push(Target::new(path).with_min_len(min_len));
|
||||
}
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(min_len);
|
||||
targets.push(Target::new(path).with_min_len(min_len));
|
||||
}
|
||||
})?;
|
||||
|
||||
SparseTrieResult::Ok((address, targets, fetched, trie))
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
for (address, targets, fetched, trie) in storage_results {
|
||||
self.fetched_storage_targets.insert(*address, fetched);
|
||||
storage_tries.insert_trie(*address, trie);
|
||||
|
||||
if !targets.is_empty() {
|
||||
self.pending_targets
|
||||
.storage_targets
|
||||
.entry(*address)
|
||||
.or_default()
|
||||
.extend(targets);
|
||||
}
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(min_len);
|
||||
targets.push(Target::new(path).with_min_len(min_len));
|
||||
} else {
|
||||
for (address, updates) in storage_updates.iter_mut() {
|
||||
if updates.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let _span = debug_span!(parent: &_updates_span, "process_storage_leaf_updates", num_updates = updates.len()).entered();
|
||||
|
||||
let trie = storage_tries.get_or_create_trie_mut(*address);
|
||||
let fetched = self.fetched_storage_targets.entry(*address).or_default();
|
||||
let mut targets = Vec::new();
|
||||
|
||||
trie.update_leaves(updates, |path, min_len| match fetched.entry(path) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
if min_len < *entry.get() {
|
||||
entry.insert(min_len);
|
||||
targets.push(Target::new(path).with_min_len(min_len));
|
||||
}
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(min_len);
|
||||
targets.push(Target::new(path).with_min_len(min_len));
|
||||
}
|
||||
})?;
|
||||
|
||||
if !targets.is_empty() {
|
||||
self.pending_targets
|
||||
.storage_targets
|
||||
.entry(*address)
|
||||
.or_default()
|
||||
.extend(targets);
|
||||
}
|
||||
}
|
||||
})?;
|
||||
}
|
||||
|
||||
SparseTrieResult::Ok((address, targets, fetched, trie))
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
Ok(())
|
||||
},
|
||||
|| {
|
||||
let _span =
|
||||
debug_span!(parent: &parent_span, "process_account_leaf_updates", num_updates = num_account_updates)
|
||||
.entered();
|
||||
|
||||
for (address, targets, fetched, trie) in storage_results {
|
||||
self.fetched_storage_targets.insert(*address, fetched);
|
||||
self.trie.insert_storage_trie(*address, trie);
|
||||
account_trie.update_leaves(account_updates, |target, min_len| {
|
||||
match self.fetched_account_targets.entry(target) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
if min_len < *entry.get() {
|
||||
entry.insert(min_len);
|
||||
self.pending_targets
|
||||
.account_targets
|
||||
.push(Target::new(target).with_min_len(min_len));
|
||||
}
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(min_len);
|
||||
self.pending_targets
|
||||
.account_targets
|
||||
.push(Target::new(target).with_min_len(min_len));
|
||||
}
|
||||
}
|
||||
})
|
||||
},
|
||||
);
|
||||
|
||||
if !targets.is_empty() {
|
||||
self.pending_targets.storage_targets.entry(*address).or_default().extend(targets);
|
||||
}
|
||||
}
|
||||
|
||||
// Process account trie updates and fill the account targets.
|
||||
self.process_account_leaf_updates(new)?;
|
||||
|
||||
Ok(())
|
||||
accounts_result.and(storages_result)
|
||||
}
|
||||
|
||||
/// Invokes `update_leaves` for the accounts trie and collects any new targets.
|
||||
@@ -1017,3 +1108,17 @@ where
|
||||
|
||||
Ok(elapsed)
|
||||
}
|
||||
|
||||
fn maybe_join<A, B, RA, RB>(should_parallelize: bool, a: A, b: B) -> (RA, RB)
|
||||
where
|
||||
A: FnOnce() -> RA + Send,
|
||||
B: FnOnce() -> RB + Send,
|
||||
RA: Send,
|
||||
RB: Send,
|
||||
{
|
||||
if should_parallelize {
|
||||
rayon::join(a, b)
|
||||
} else {
|
||||
(a(), b())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -223,6 +223,11 @@ where
|
||||
&mut self.storage.tries
|
||||
}
|
||||
|
||||
/// Returns mutable references to both the account trie and storage tries.
|
||||
pub const fn tries_mut(&mut self) -> (&mut RevealableSparseTrie<A>, &mut StorageTries<S>) {
|
||||
(&mut self.state, &mut self.storage)
|
||||
}
|
||||
|
||||
/// Takes the storage trie for the provided address.
|
||||
pub fn take_storage_trie(&mut self, address: &B256) -> Option<RevealableSparseTrie<S>> {
|
||||
self.storage.tries.remove(address)
|
||||
@@ -1179,7 +1184,7 @@ where
|
||||
/// of [`SparseStateTrie`] both to help enforce allocation re-use and to allow us to implement
|
||||
/// methods like `get_trie_and_revealed_paths` which return multiple mutable borrows.
|
||||
#[derive(Debug, Default)]
|
||||
struct StorageTries<S = SerialSparseTrie> {
|
||||
pub struct StorageTries<S = SerialSparseTrie> {
|
||||
/// Sparse storage tries.
|
||||
tries: B256Map<RevealableSparseTrie<S>>,
|
||||
/// Cleared storage tries, kept for re-use.
|
||||
@@ -1376,7 +1381,7 @@ impl<S: SparseTrieTrait + Clone> StorageTries<S> {
|
||||
}
|
||||
|
||||
// Returns mutable reference to storage sparse trie, creating a blind one if it doesn't exist.
|
||||
fn get_or_create_trie_mut(&mut self, address: B256) -> &mut RevealableSparseTrie<S> {
|
||||
pub fn get_or_create_trie_mut(&mut self, address: B256) -> &mut RevealableSparseTrie<S> {
|
||||
self.tries.entry(address).or_insert_with(|| {
|
||||
self.cleared_tries.pop().unwrap_or_else(|| self.default_trie.clone())
|
||||
})
|
||||
@@ -1385,12 +1390,17 @@ impl<S: SparseTrieTrait + Clone> StorageTries<S> {
|
||||
/// Takes the storage trie for the account from the internal `HashMap`, creating it if it
|
||||
/// doesn't already exist.
|
||||
#[cfg(feature = "std")]
|
||||
fn take_or_create_trie(&mut self, account: &B256) -> RevealableSparseTrie<S> {
|
||||
pub fn take_or_create_trie(&mut self, account: &B256) -> RevealableSparseTrie<S> {
|
||||
self.tries.remove(account).unwrap_or_else(|| {
|
||||
self.cleared_tries.pop().unwrap_or_else(|| self.default_trie.clone())
|
||||
})
|
||||
}
|
||||
|
||||
/// Inserts the storage trie for the account into the internal `HashMap`.
|
||||
pub fn insert_trie(&mut self, account: B256, trie: RevealableSparseTrie<S>) {
|
||||
self.tries.insert(account, trie);
|
||||
}
|
||||
|
||||
/// Takes the revealed paths set from the account from the internal `HashMap`, creating one if
|
||||
/// it doesn't exist.
|
||||
#[cfg(feature = "std")]
|
||||
|
||||
Reference in New Issue
Block a user