mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
10 Commits
push
...
klkvr/inst
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
121e93eaf1 | ||
|
|
8d5e47c91e | ||
|
|
44b1759ef2 | ||
|
|
e4357ffa4c | ||
|
|
3733e85d60 | ||
|
|
de5d2c11f8 | ||
|
|
325513ba1a | ||
|
|
7bffd38300 | ||
|
|
3fcf7c1dae | ||
|
|
889d03527c |
@@ -233,6 +233,11 @@ pub(super) struct SparseTrieCacheTask<A = SerialSparseTrie, S = SerialSparseTrie
|
||||
account_updates: B256Map<LeafUpdate>,
|
||||
/// Storage trie updates. hashed address -> slot -> update.
|
||||
storage_updates: B256Map<B256Map<LeafUpdate>>,
|
||||
|
||||
/// Account updates that are buffered but were not yet applied to the trie.
|
||||
new_account_updates: B256Map<LeafUpdate>,
|
||||
/// Storage updates that are buffered but were not yet applied to the trie.
|
||||
new_storage_updates: B256Map<B256Map<LeafUpdate>>,
|
||||
/// Account updates that are blocked by storage root calculation or account reveal.
|
||||
///
|
||||
/// Those are being moved into `account_updates` once storage roots
|
||||
@@ -291,13 +296,15 @@ where
|
||||
max_targets_for_chunking: DEFAULT_MAX_TARGETS_FOR_CHUNKING,
|
||||
account_updates: Default::default(),
|
||||
storage_updates: Default::default(),
|
||||
new_account_updates: Default::default(),
|
||||
new_storage_updates: Default::default(),
|
||||
pending_account_updates: Default::default(),
|
||||
fetched_account_targets: Default::default(),
|
||||
fetched_storage_targets: Default::default(),
|
||||
account_rlp_buf: Vec::with_capacity(TRIE_ACCOUNT_RLP_MAX_SIZE),
|
||||
finished_state_updates: Default::default(),
|
||||
pending_targets: Default::default(),
|
||||
pending_updates: Default::default(),
|
||||
pending_updates: 0,
|
||||
metrics,
|
||||
}
|
||||
}
|
||||
@@ -381,18 +388,16 @@ where
|
||||
// If we don't have any pending messages, we can spend some time on computing
|
||||
// storage roots and promoting account updates.
|
||||
self.dispatch_pending_targets();
|
||||
self.process_new_updates()?;
|
||||
self.promote_pending_account_updates()?;
|
||||
self.dispatch_pending_targets();
|
||||
} else if self.updates.is_empty() || self.pending_updates > MAX_PENDING_UPDATES {
|
||||
// If we don't have any pending updates OR we've accumulated a lot already, apply
|
||||
// them to the trie,
|
||||
self.process_leaf_updates()?;
|
||||
self.process_new_updates()?;
|
||||
self.dispatch_pending_targets();
|
||||
} else if self.updates.is_empty() ||
|
||||
self.pending_targets.chunking_length() > self.chunk_size.unwrap_or_default()
|
||||
{
|
||||
// Make sure to dispatch targets if we don't have any updates or if we've
|
||||
// accumulated a lot of them.
|
||||
} else if self.pending_targets.chunking_length() > self.chunk_size.unwrap_or_default() {
|
||||
// Make sure to dispatch targets if we've accumulated a lot of them.
|
||||
self.dispatch_pending_targets();
|
||||
}
|
||||
|
||||
@@ -442,13 +447,13 @@ where
|
||||
|
||||
for target in targets.account_targets {
|
||||
// Only touch accounts that are not yet present in the updates set.
|
||||
self.account_updates.entry(target.key()).or_insert(LeafUpdate::Touched);
|
||||
self.new_account_updates.entry(target.key()).or_insert(LeafUpdate::Touched);
|
||||
}
|
||||
|
||||
for (address, slots) in targets.storage_targets {
|
||||
for slot in slots {
|
||||
// Only touch storages that are not yet present in the updates set.
|
||||
self.storage_updates
|
||||
self.new_storage_updates
|
||||
.entry(address)
|
||||
.or_default()
|
||||
.entry(slot.key())
|
||||
@@ -457,7 +462,7 @@ where
|
||||
|
||||
// Touch corresponding account leaf to make sure its revealed in accounts trie for
|
||||
// storage root update.
|
||||
self.account_updates.entry(address).or_insert(LeafUpdate::Touched);
|
||||
self.new_account_updates.entry(address).or_insert(LeafUpdate::Touched);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -478,15 +483,18 @@ where
|
||||
} else {
|
||||
alloy_rlp::encode_fixed_size(&value).to_vec()
|
||||
};
|
||||
self.storage_updates
|
||||
self.new_storage_updates
|
||||
.entry(address)
|
||||
.or_default()
|
||||
.insert(slot, LeafUpdate::Changed(encoded));
|
||||
|
||||
// Remove an existing storage update if it exists.
|
||||
self.storage_updates.get_mut(&address).and_then(|updates| updates.remove(&slot));
|
||||
}
|
||||
|
||||
// Make sure account is tracked in `account_updates` so that it is revealed in accounts
|
||||
// trie for storage root update.
|
||||
self.account_updates.entry(address).or_insert(LeafUpdate::Touched);
|
||||
self.new_account_updates.entry(address).or_insert(LeafUpdate::Touched);
|
||||
|
||||
// Make sure account is tracked in `pending_account_updates` so that once storage root
|
||||
// is computed, it will be updated in the accounts trie.
|
||||
@@ -498,7 +506,7 @@ where
|
||||
//
|
||||
// This might overwrite an existing update, which is fine, because storage root from it
|
||||
// is already tracked in the trie and can be easily fetched again.
|
||||
self.account_updates.insert(address, LeafUpdate::Touched);
|
||||
self.new_account_updates.insert(address, LeafUpdate::Touched);
|
||||
|
||||
// Track account in `pending_account_updates` so that once storage root is computed,
|
||||
// it will be updated in the accounts trie.
|
||||
@@ -515,6 +523,50 @@ where
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
level = "debug",
|
||||
target = "engine::tree::payload_processor::sparse_trie",
|
||||
skip_all
|
||||
)]
|
||||
fn process_new_updates(&mut self) -> SparseTrieResult<()> {
|
||||
self.pending_updates = 0;
|
||||
|
||||
// Firstly apply all new storage and account updates to the tries.
|
||||
self.process_leaf_updates(true)?;
|
||||
|
||||
for (address, mut new) in self.new_storage_updates.drain() {
|
||||
let updates = self.storage_updates.entry(address).or_default();
|
||||
for (slot, new) in new.drain() {
|
||||
match updates.entry(slot) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
// Only overwrite existing entries with new values
|
||||
if new.is_changed() {
|
||||
entry.insert(new);
|
||||
}
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(new);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (address, new) in self.new_account_updates.drain() {
|
||||
match self.account_updates.entry(address) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
if new.is_changed() {
|
||||
entry.insert(new);
|
||||
}
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(new);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Applies all account and storage leaf updates to corresponding tries and collects any new
|
||||
/// multiproof targets.
|
||||
#[instrument(
|
||||
@@ -522,12 +574,12 @@ where
|
||||
target = "engine::tree::payload_processor::sparse_trie",
|
||||
skip_all
|
||||
)]
|
||||
fn process_leaf_updates(&mut self) -> SparseTrieResult<()> {
|
||||
self.pending_updates = 0;
|
||||
fn process_leaf_updates(&mut self, new: bool) -> SparseTrieResult<()> {
|
||||
let storage_updates =
|
||||
if new { &mut self.new_storage_updates } else { &mut self.storage_updates };
|
||||
|
||||
// Start with processing all storage updates in parallel.
|
||||
let storage_results = self
|
||||
.storage_updates
|
||||
let storage_results = storage_updates
|
||||
.iter_mut()
|
||||
.map(|(address, updates)| {
|
||||
let trie = self.trie.take_or_create_storage_trie(address);
|
||||
@@ -566,7 +618,7 @@ where
|
||||
}
|
||||
|
||||
// Process account trie updates and fill the account targets.
|
||||
self.process_account_leaf_updates()?;
|
||||
self.process_account_leaf_updates(new)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -574,12 +626,14 @@ where
|
||||
/// Invokes `update_leaves` for the accounts trie and collects any new targets.
|
||||
///
|
||||
/// Returns whether any updates were drained (applied to the trie).
|
||||
fn process_account_leaf_updates(&mut self) -> SparseTrieResult<bool> {
|
||||
let updates_len_before = self.account_updates.len();
|
||||
fn process_account_leaf_updates(&mut self, new: bool) -> SparseTrieResult<bool> {
|
||||
let account_updates =
|
||||
if new { &mut self.new_account_updates } else { &mut self.account_updates };
|
||||
|
||||
self.trie.trie_mut().update_leaves(
|
||||
&mut self.account_updates,
|
||||
|target, min_len| match self.fetched_account_targets.entry(target) {
|
||||
let updates_len_before = account_updates.len();
|
||||
|
||||
self.trie.trie_mut().update_leaves(account_updates, |target, min_len| {
|
||||
match self.fetched_account_targets.entry(target) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
if min_len < *entry.get() {
|
||||
entry.insert(min_len);
|
||||
@@ -594,10 +648,10 @@ where
|
||||
.account_targets
|
||||
.push(Target::new(target).with_min_len(min_len));
|
||||
}
|
||||
},
|
||||
)?;
|
||||
}
|
||||
})?;
|
||||
|
||||
Ok(self.account_updates.len() < updates_len_before)
|
||||
Ok(account_updates.len() < updates_len_before)
|
||||
}
|
||||
|
||||
/// Iterates through all storage tries for which all updates were processed, computes their
|
||||
@@ -609,7 +663,7 @@ where
|
||||
skip_all
|
||||
)]
|
||||
fn promote_pending_account_updates(&mut self) -> SparseTrieResult<()> {
|
||||
self.process_leaf_updates()?;
|
||||
self.process_leaf_updates(false)?;
|
||||
|
||||
if self.pending_account_updates.is_empty() {
|
||||
return Ok(());
|
||||
@@ -702,7 +756,7 @@ where
|
||||
//
|
||||
// We need to keep iterating if any updates are being drained because that might
|
||||
// indicate that more pending account updates can be promoted.
|
||||
if !self.process_account_leaf_updates()? {
|
||||
if !self.process_account_leaf_updates(false)? {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ use reth_trie_sparse::{
|
||||
};
|
||||
use smallvec::SmallVec;
|
||||
use std::cmp::{Ord, Ordering, PartialOrd};
|
||||
use tracing::{debug, instrument, trace};
|
||||
use tracing::{debug, debug_span, instrument, trace};
|
||||
|
||||
/// The maximum length of a path, in nibbles, which belongs to the upper subtrie of a
|
||||
/// [`ParallelSparseTrie`]. All longer paths belong to a lower subtrie.
|
||||
@@ -1228,6 +1228,7 @@ impl SparseTrieExt for ParallelSparseTrie {
|
||||
nodes_converted
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", target = "reth_trie_parallel::trie", skip_all)]
|
||||
fn update_leaves(
|
||||
&mut self,
|
||||
updates: &mut alloy_primitives::map::B256Map<reth_trie_sparse::LeafUpdate>,
|
||||
@@ -1244,6 +1245,9 @@ impl SparseTrieExt for ParallelSparseTrie {
|
||||
// Remove upfront - we'll re-insert if the operation fails due to blinded node.
|
||||
let update = updates.remove(&key).unwrap();
|
||||
|
||||
let is_changed = matches!(update, LeafUpdate::Changed(_));
|
||||
let _span = debug_span!("processing leaf update", ?key, ?is_changed).entered();
|
||||
|
||||
match update {
|
||||
LeafUpdate::Changed(value) => {
|
||||
if value.is_empty() {
|
||||
|
||||
@@ -24,6 +24,18 @@ pub enum LeafUpdate {
|
||||
Touched,
|
||||
}
|
||||
|
||||
impl LeafUpdate {
|
||||
/// Returns true if the leaf update is a change.
|
||||
pub const fn is_changed(&self) -> bool {
|
||||
matches!(self, Self::Changed(_))
|
||||
}
|
||||
|
||||
/// Returns true if the leaf update is a touched update.
|
||||
pub const fn is_touched(&self) -> bool {
|
||||
matches!(self, Self::Touched)
|
||||
}
|
||||
}
|
||||
|
||||
/// Trait defining common operations for revealed sparse trie implementations.
|
||||
///
|
||||
/// This trait abstracts over different sparse trie implementations (serial vs parallel)
|
||||
|
||||
Reference in New Issue
Block a user