perf(trie): process new updates from state/prewarm update directly (#21768)

This commit is contained in:
Arsenii Kulikov
2026-02-05 03:39:44 +04:00
committed by GitHub
parent 261ca8b4e3
commit a5978c593e
2 changed files with 93 additions and 27 deletions

View File

@@ -236,6 +236,11 @@ pub(super) struct SparseTrieCacheTask<A = SerialSparseTrie, S = SerialSparseTrie
account_updates: B256Map<LeafUpdate>,
/// Storage trie updates. hashed address -> slot -> update.
storage_updates: B256Map<B256Map<LeafUpdate>>,
/// Account updates that are buffered but were not yet applied to the trie.
new_account_updates: B256Map<LeafUpdate>,
/// Storage updates that are buffered but were not yet applied to the trie.
new_storage_updates: B256Map<B256Map<LeafUpdate>>,
/// Account updates that are blocked by storage root calculation or account reveal.
///
/// Those are being moved into `account_updates` once storage roots
@@ -294,6 +299,8 @@ where
max_targets_for_chunking: DEFAULT_MAX_TARGETS_FOR_CHUNKING,
account_updates: Default::default(),
storage_updates: Default::default(),
new_account_updates: Default::default(),
new_storage_updates: Default::default(),
pending_account_updates: Default::default(),
fetched_account_targets: Default::default(),
fetched_storage_targets: Default::default(),
@@ -389,18 +396,16 @@ where
// If we don't have any pending messages, we can spend some time on computing
// storage roots and promoting account updates.
self.dispatch_pending_targets();
self.process_new_updates()?;
self.promote_pending_account_updates()?;
self.dispatch_pending_targets();
} else if self.updates.is_empty() || self.pending_updates > MAX_PENDING_UPDATES {
// If we don't have any pending updates OR we've accumulated a lot already, apply
// them to the trie,
self.process_leaf_updates()?;
self.process_new_updates()?;
self.dispatch_pending_targets();
} else if self.updates.is_empty() ||
self.pending_targets.chunking_length() > self.chunk_size.unwrap_or_default()
{
// Make sure to dispatch targets if we don't have any updates or if we've
// accumulated a lot of them.
} else if self.pending_targets.chunking_length() > self.chunk_size.unwrap_or_default() {
// Make sure to dispatch targets if we've accumulated a lot of them.
self.dispatch_pending_targets();
}
@@ -450,13 +455,13 @@ where
for target in targets.account_targets {
// Only touch accounts that are not yet present in the updates set.
self.account_updates.entry(target.key()).or_insert(LeafUpdate::Touched);
self.new_account_updates.entry(target.key()).or_insert(LeafUpdate::Touched);
}
for (address, slots) in targets.storage_targets {
for slot in slots {
// Only touch storages that are not yet present in the updates set.
self.storage_updates
self.new_storage_updates
.entry(address)
.or_default()
.entry(slot.key())
@@ -465,7 +470,7 @@ where
// Touch corresponding account leaf to make sure its revealed in accounts trie for
// storage root update.
self.account_updates.entry(address).or_insert(LeafUpdate::Touched);
self.new_account_updates.entry(address).or_insert(LeafUpdate::Touched);
}
}
@@ -486,15 +491,18 @@ where
} else {
alloy_rlp::encode_fixed_size(&value).to_vec()
};
self.storage_updates
self.new_storage_updates
.entry(address)
.or_default()
.insert(slot, LeafUpdate::Changed(encoded));
// Remove an existing storage update if it exists.
self.storage_updates.get_mut(&address).and_then(|updates| updates.remove(&slot));
}
// Make sure account is tracked in `account_updates` so that it is revealed in accounts
// trie for storage root update.
self.account_updates.entry(address).or_insert(LeafUpdate::Touched);
self.new_account_updates.entry(address).or_insert(LeafUpdate::Touched);
// Make sure account is tracked in `pending_account_updates` so that once storage root
// is computed, it will be updated in the accounts trie.
@@ -506,7 +514,7 @@ where
//
// This might overwrite an existing update, which is fine, because storage root from it
// is already tracked in the trie and can be easily fetched again.
self.account_updates.insert(address, LeafUpdate::Touched);
self.new_account_updates.insert(address, LeafUpdate::Touched);
// Track account in `pending_account_updates` so that once storage root is computed,
// it will be updated in the accounts trie.
@@ -523,6 +531,50 @@ where
})
}
#[instrument(
level = "debug",
target = "engine::tree::payload_processor::sparse_trie",
skip_all
)]
fn process_new_updates(&mut self) -> SparseTrieResult<()> {
self.pending_updates = 0;
// Firstly apply all new storage and account updates to the tries.
self.process_leaf_updates(true)?;
for (address, mut new) in self.new_storage_updates.drain() {
let updates = self.storage_updates.entry(address).or_default();
for (slot, new) in new.drain() {
match updates.entry(slot) {
Entry::Occupied(mut entry) => {
// Only overwrite existing entries with new values
if new.is_changed() {
entry.insert(new);
}
}
Entry::Vacant(entry) => {
entry.insert(new);
}
}
}
}
for (address, new) in self.new_account_updates.drain() {
match self.account_updates.entry(address) {
Entry::Occupied(mut entry) => {
if new.is_changed() {
entry.insert(new);
}
}
Entry::Vacant(entry) => {
entry.insert(new);
}
}
}
Ok(())
}
/// Applies all account and storage leaf updates to corresponding tries and collects any new
/// multiproof targets.
#[instrument(
@@ -530,12 +582,12 @@ where
target = "engine::tree::payload_processor::sparse_trie",
skip_all
)]
fn process_leaf_updates(&mut self) -> SparseTrieResult<()> {
self.pending_updates = 0;
fn process_leaf_updates(&mut self, new: bool) -> SparseTrieResult<()> {
let storage_updates =
if new { &mut self.new_storage_updates } else { &mut self.storage_updates };
// Process all storage updates in parallel, skipping tries with no pending updates.
let storage_results = self
.storage_updates
let storage_results = storage_updates
.iter_mut()
.filter(|(_, updates)| !updates.is_empty())
.map(|(address, updates)| {
@@ -575,7 +627,7 @@ where
}
// Process account trie updates and fill the account targets.
self.process_account_leaf_updates()?;
self.process_account_leaf_updates(new)?;
Ok(())
}
@@ -583,12 +635,14 @@ where
/// Invokes `update_leaves` for the accounts trie and collects any new targets.
///
/// Returns whether any updates were drained (applied to the trie).
fn process_account_leaf_updates(&mut self) -> SparseTrieResult<bool> {
let updates_len_before = self.account_updates.len();
fn process_account_leaf_updates(&mut self, new: bool) -> SparseTrieResult<bool> {
let account_updates =
if new { &mut self.new_account_updates } else { &mut self.account_updates };
self.trie.trie_mut().update_leaves(
&mut self.account_updates,
|target, min_len| match self.fetched_account_targets.entry(target) {
let updates_len_before = account_updates.len();
self.trie.trie_mut().update_leaves(account_updates, |target, min_len| {
match self.fetched_account_targets.entry(target) {
Entry::Occupied(mut entry) => {
if min_len < *entry.get() {
entry.insert(min_len);
@@ -603,10 +657,10 @@ where
.account_targets
.push(Target::new(target).with_min_len(min_len));
}
},
)?;
}
})?;
Ok(self.account_updates.len() < updates_len_before)
Ok(account_updates.len() < updates_len_before)
}
/// Iterates through all storage tries for which all updates were processed, computes their
@@ -618,7 +672,7 @@ where
skip_all
)]
fn promote_pending_account_updates(&mut self) -> SparseTrieResult<()> {
self.process_leaf_updates()?;
self.process_leaf_updates(false)?;
if self.pending_account_updates.is_empty() {
return Ok(());
@@ -711,7 +765,7 @@ where
//
// We need to keep iterating if any updates are being drained because that might
// indicate that more pending account updates can be promoted.
if !self.process_account_leaf_updates()? {
if !self.process_account_leaf_updates(false)? {
break
}
}

View File

@@ -24,6 +24,18 @@ pub enum LeafUpdate {
Touched,
}
impl LeafUpdate {
/// Returns true if the leaf update is a change.
pub const fn is_changed(&self) -> bool {
matches!(self, Self::Changed(_))
}
/// Returns true if the leaf update is a touched update.
pub const fn is_touched(&self) -> bool {
matches!(self, Self::Touched)
}
}
/// Trait defining common operations for revealed sparse trie implementations.
///
/// This trait abstracts over different sparse trie implementations (serial vs parallel)