From 7e80c3cac24c039434a631d9d396951695692fcb Mon Sep 17 00:00:00 2001 From: yongkangc Date: Mon, 15 Dec 2025 05:05:50 +0000 Subject: [PATCH] merrge --- .../src/tree/payload_processor/multiproof.rs | 284 ++++++++++++++++-- crates/trie/common/src/added_removed_keys.rs | 46 +-- 2 files changed, 268 insertions(+), 62 deletions(-) diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs index 9c4faa158b..52c09bd31d 100644 --- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -3,7 +3,7 @@ use alloy_evm::block::StateChangeSource; use alloy_primitives::{ keccak256, - map::{B256Set, HashSet}, + map::{B256Map, B256Set, HashSet}, B256, }; use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender}; @@ -883,19 +883,31 @@ impl MultiProofTask { skip(self, update), fields(accounts = update.len(), chunks = 0) )] - fn on_state_update(&mut self, source: StateChangeSource, update: EvmState) -> u64 { + fn on_state_update( + &mut self, + source: StateChangeSource, + update: EvmState, + batch_removed_keys: &BatchedRemovedKeys, + ) -> u64 { let hashed_state_update = evm_state_to_hashed_post_state(update); - // NOTE: Removal tracking is done at the batch site via record_removals(), - // which is called on each sub-update BEFORE extend() merges them. - // This ensures intermediate deletions (e.g., 100→0→100) are captured. - // We intentionally do NOT call update_with_state() here because it would - // clear removal flags via remove_removed() when seeing non-zero values. + // Update the persistent removed keys state based on the MERGED state update. + // This tracks the "current" removal state for subsequent batches. + self.multi_added_removed_keys.update_with_state(&hashed_state_update); + + // For proof selection within THIS batch, we need to include any keys that were + // deleted mid-batch but recreated (e.g., 100→0→100). The merged state shows + // the final value, but update_with_state clears the removal flag when it sees + // non-zero values. So we apply the batch_removed_keys to override those clears. + let mut proof_added_removed_keys = self.multi_added_removed_keys.clone(); + if !batch_removed_keys.is_empty() { + batch_removed_keys.apply_to(&mut proof_added_removed_keys); + } // Split the state update into already fetched and not fetched according to the proof // targets. let (fetched_state_update, not_fetched_state_update) = hashed_state_update - .partition_by_targets(&self.fetched_proof_targets, &self.multi_added_removed_keys); + .partition_by_targets(&self.fetched_proof_targets, &proof_added_removed_keys); let mut state_updates = 0; // If there are any accounts or storage slots that we already fetched the proofs for, @@ -909,7 +921,7 @@ impl MultiProofTask { } // Clone+Arc MultiAddedRemovedKeys for sharing with the dispatched multiproof tasks - let multi_added_removed_keys = Arc::new(self.multi_added_removed_keys.clone()); + let multi_added_removed_keys = Arc::new(proof_added_removed_keys); let chunking_len = not_fetched_state_update.chunking_length(); let mut spawned_proof_targets = MultiProofTargets::default(); @@ -1104,25 +1116,14 @@ impl MultiProofTask { self.metrics.state_update_batch_size_histogram.record(num_batched as f64); // Merge all accumulated updates into a single EvmState payload while - // recording removals BEFORE extend() can overwrite them. - // Use drain to preserve the buffer allocation. - let mut accumulated_iter = ctx.accumulated_state_updates.drain(..); - let (batch_source, mut merged_update) = accumulated_iter - .next() - .expect("state update batch always has at least one entry"); - - // Record removals from the first update - self.multi_added_removed_keys.record_removals(&merged_update); - - for (_, next_update) in accumulated_iter { - // Record removals BEFORE extend() overwrites them - self.multi_added_removed_keys.record_removals(&next_update); - merged_update.extend(next_update); - } + // preserving deletion information. Use drain to preserve buffer allocation. + let (batch_source, merged_update, batch_removed_keys) = + batch_state_updates(ctx.accumulated_state_updates.drain(..)) + .expect("state update batch always has at least one entry"); let batch_len = merged_update.len(); batch_metrics.state_update_proofs_requested += - self.on_state_update(batch_source, merged_update); + self.on_state_update(batch_source, merged_update, &batch_removed_keys); debug!( target: "engine::tree::payload_processor::multiproof", ?batch_source, @@ -1407,7 +1408,9 @@ fn get_proof_targets( // first collect all new accounts (not previously fetched) for &hashed_address in state_update.accounts.keys() { - if !fetched_proof_targets.contains_key(&hashed_address) { + if !fetched_proof_targets.contains_key(&hashed_address) || + multi_added_removed_keys.get_accounts().is_removed(&hashed_address) + { targets.insert(hashed_address, HashSet::default()); } } @@ -1438,6 +1441,81 @@ fn get_proof_targets( targets } +/// Tracks removals across batched state updates so deletion signals are not lost when updates are +/// merged. +#[derive(Default, Debug)] +struct BatchedRemovedKeys { + accounts: HashSet, + storages: B256Map>, +} + +impl BatchedRemovedKeys { + /// Records any account or storage removals observed in the given state update. + fn record(&mut self, update: &EvmState) { + for (address, account) in update { + if !account.is_touched() { + continue; + } + + let hashed_address = keccak256(*address); + + // Selfdestruct clears storage, so force a refetch of the account proof. + if account.is_selfdestructed() { + self.accounts.insert(hashed_address); + continue; + } + + for (slot, value) in &account.storage { + if value.is_changed() && value.present_value.is_zero() { + self.storages + .entry(hashed_address) + .or_default() + .insert(keccak256(B256::from(*slot))); + } + } + } + } + + /// Applies recorded removals to the given [`MultiAddedRemovedKeys`]. + fn apply_to(&self, added_removed_keys: &mut MultiAddedRemovedKeys) { + for account in &self.accounts { + added_removed_keys.mark_account_removed(*account); + } + + for (hashed_address, slots) in &self.storages { + for slot in slots { + added_removed_keys.mark_storage_removed(*hashed_address, *slot); + } + } + } + + /// Returns true if no removals were recorded. + fn is_empty(&self) -> bool { + self.accounts.is_empty() && self.storages.is_empty() + } +} + +/// Merges multiple state updates while preserving deletion information. +/// +/// When `extend()` merges updates, intermediate deletions (values set to zero) are overwritten +/// by later non-zero values. This function captures those deletions before they're lost, +/// ensuring proofs are correctly refetched for keys that were deleted mid-batch. +/// +/// Returns `(source, merged_state, removed_keys)` or `None` if the iterator is empty. +fn batch_state_updates( + updates: impl Iterator, +) -> Option<(StateChangeSource, EvmState, BatchedRemovedKeys)> { + let mut iter = updates; + let (source, mut merged) = iter.next()?; + let mut removed = BatchedRemovedKeys::default(); + removed.record(&merged); + for (_, next) in iter { + removed.record(&next); + merged.extend(next); + } + Some((source, merged, removed)) +} + /// Dispatches work items as a single unit or in chunks based on target size and worker /// availability. #[allow(clippy::too_many_arguments)] @@ -1962,6 +2040,158 @@ mod tests { assert!(!targets.contains_key(&addr)); } + #[test] + fn test_get_proof_targets_refetches_removed_account() { + let mut state = HashedPostState::default(); + let mut fetched = MultiProofTargets::default(); + let mut multi_added_removed_keys = MultiAddedRemovedKeys::new(); + + let addr = B256::random(); + + state.accounts.insert(addr, Some(Default::default())); + fetched.insert(addr, HashSet::default()); + multi_added_removed_keys.mark_account_removed(addr); + + let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys); + + assert!(targets.contains_key(&addr)); + } + + #[test] + fn test_batched_removed_keys_preserve_hidden_deletions() { + use alloy_primitives::Address; + use revm_state::{Account, AccountInfo, AccountStatus, EvmStorageSlot}; + + let mut removed_keys = BatchedRemovedKeys::default(); + let address = Address::random(); + let slot = U256::from(1); + + // Record a deletion in an intermediate update. + let mut delete_update = EvmState::default(); + delete_update.insert( + address, + Account { + info: AccountInfo::default(), + transaction_id: 0, + storage: std::iter::once(( + slot, + EvmStorageSlot::new_changed(U256::from(10), U256::ZERO, 0), + )) + .collect(), + status: AccountStatus::Touched, + }, + ); + removed_keys.record(&delete_update); + + // Final merged state re-adds the slot with a non-zero value. + let mut state = HashedPostState::default(); + let hashed_address = keccak256(address); + let hashed_slot = keccak256(B256::from(slot)); + let mut storage = HashedStorage::default(); + storage.storage.insert(hashed_slot, U256::from(20)); + state.accounts.insert(hashed_address, Some(Default::default())); + state.storages.insert(hashed_address, storage); + + // Slot was already fetched previously. + let mut fetched = MultiProofTargets::default(); + fetched.insert(hashed_address, HashSet::from_iter([hashed_slot])); + + // Apply recorded deletions so proof selection still refetches. + let mut multi_added_removed_keys = MultiAddedRemovedKeys::new(); + removed_keys.apply_to(&mut multi_added_removed_keys); + + let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys); + assert!(targets.get(&hashed_address).is_some_and(|slots| slots.contains(&hashed_slot))); + } + + #[test] + fn test_batch_state_updates_preserves_intermediate_deletions() { + use alloy_primitives::Address; + use revm_state::{Account, AccountInfo, AccountStatus, EvmStorageSlot}; + + let address = Address::random(); + let slot = U256::from(42); + + // Update 1: Create slot with value 100 + let mut update1 = EvmState::default(); + update1.insert( + address, + Account { + info: AccountInfo::default(), + transaction_id: 0, + storage: std::iter::once(( + slot, + EvmStorageSlot::new_changed(U256::ZERO, U256::from(100), 0), + )) + .collect(), + status: AccountStatus::Touched, + }, + ); + + // Update 2: Delete slot (set to 0) + let mut update2 = EvmState::default(); + update2.insert( + address, + Account { + info: AccountInfo::default(), + transaction_id: 1, + storage: std::iter::once(( + slot, + EvmStorageSlot::new_changed(U256::from(100), U256::ZERO, 1), + )) + .collect(), + status: AccountStatus::Touched, + }, + ); + + // Update 3: Recreate slot with value 200 + let mut update3 = EvmState::default(); + update3.insert( + address, + Account { + info: AccountInfo::default(), + transaction_id: 2, + storage: std::iter::once(( + slot, + EvmStorageSlot::new_changed(U256::ZERO, U256::from(200), 2), + )) + .collect(), + status: AccountStatus::Touched, + }, + ); + + let source = StateChangeSource::Transaction(0); + let updates = vec![ + (source, update1), + (StateChangeSource::Transaction(1), update2), + (StateChangeSource::Transaction(2), update3), + ]; + + let (result_source, merged, removed_keys) = + batch_state_updates(updates.into_iter()).expect("should have updates"); + + // Source should be from first update + assert!(matches!(result_source, StateChangeSource::Transaction(0))); + + // Merged state should have final value 200 + let account = merged.get(&address).expect("account should exist"); + let slot_value = account.storage.get(&slot).expect("slot should exist"); + assert_eq!(slot_value.present_value, U256::from(200)); + + // Crucially: removal should be recorded even though final value is non-zero + let hashed_address = keccak256(address); + let hashed_slot = keccak256(B256::from(slot)); + + let mut added_removed_keys = MultiAddedRemovedKeys::new(); + removed_keys.apply_to(&mut added_removed_keys); + + let storage_keys = added_removed_keys.get_storage(&hashed_address); + assert!( + storage_keys.is_some_and(|k| k.is_removed(&hashed_slot)), + "slot should be marked as removed despite final non-zero value" + ); + } + /// Verifies that consecutive prefetch proof messages are batched together. #[test] fn test_prefetch_proofs_batching() { @@ -2079,7 +2309,7 @@ mod tests { assert!(merged_update.contains_key(&addr1)); assert!(merged_update.contains_key(&addr2)); - task.on_state_update(source, merged_update) + task.on_state_update(source, merged_update, &BatchedRemovedKeys::default()) } else { panic!("Expected StateUpdate message"); }; diff --git a/crates/trie/common/src/added_removed_keys.rs b/crates/trie/common/src/added_removed_keys.rs index ef958f4d3f..6a84d6d8e0 100644 --- a/crates/trie/common/src/added_removed_keys.rs +++ b/crates/trie/common/src/added_removed_keys.rs @@ -1,9 +1,8 @@ //! Tracking of keys having been added and removed from the tries. use crate::HashedPostState; -use alloy_primitives::{keccak256, map::B256Map, B256}; +use alloy_primitives::{map::B256Map, B256}; use alloy_trie::proof::AddedRemovedKeys; -use revm_state::EvmState; /// Tracks added and removed keys across account and storage tries. #[derive(Debug, Clone)] @@ -85,40 +84,17 @@ impl MultiAddedRemovedKeys { } } - /// Records removals from an EVM state update. - /// - /// Unlike [`Self::update_with_state`], this treats removals as monotonic - - /// once a key is marked removed, it stays removed. This is correct for - /// intra-block proof invalidation where branch proofs become stale on - /// deletion regardless of later recreation. - /// - /// Call this on each sub-update BEFORE merging with `extend()` to ensure - /// intermediate deletions are captured. - pub fn record_removals(&mut self, update: &EvmState) { - for (address, account) in update { - if !account.is_touched() { - continue; - } + /// Marks an account as removed. + pub fn mark_account_removed(&mut self, account: B256) { + self.account.insert_removed(account); + } - let hashed_address = keccak256(*address); - - // Selfdestruct wipes storage - mark account as removed - if account.is_selfdestructed() { - self.account.insert_removed(hashed_address); - continue; - } - - // Track storage slots being deleted (set to zero) - for (slot, value) in &account.storage { - if value.is_changed() && value.present_value.is_zero() { - self.storages - .entry(hashed_address) - .or_insert_with(default_added_removed_keys) - .insert_removed(keccak256(B256::from(*slot))); - } - } - // NOTE: No remove_removed calls - removals are monotonic within a block - } + /// Marks a storage slot as removed for the given account. + pub fn mark_storage_removed(&mut self, hashed_address: B256, slot: B256) { + self.storages + .entry(hashed_address) + .or_insert_with(default_added_removed_keys) + .insert_removed(slot); } }