mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-08 23:08:19 -05:00
merrge
This commit is contained in:
@@ -3,7 +3,7 @@
|
||||
use alloy_evm::block::StateChangeSource;
|
||||
use alloy_primitives::{
|
||||
keccak256,
|
||||
map::{B256Set, HashSet},
|
||||
map::{B256Map, B256Set, HashSet},
|
||||
B256,
|
||||
};
|
||||
use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
|
||||
@@ -883,19 +883,31 @@ impl MultiProofTask {
|
||||
skip(self, update),
|
||||
fields(accounts = update.len(), chunks = 0)
|
||||
)]
|
||||
fn on_state_update(&mut self, source: StateChangeSource, update: EvmState) -> u64 {
|
||||
fn on_state_update(
|
||||
&mut self,
|
||||
source: StateChangeSource,
|
||||
update: EvmState,
|
||||
batch_removed_keys: &BatchedRemovedKeys,
|
||||
) -> u64 {
|
||||
let hashed_state_update = evm_state_to_hashed_post_state(update);
|
||||
|
||||
// NOTE: Removal tracking is done at the batch site via record_removals(),
|
||||
// which is called on each sub-update BEFORE extend() merges them.
|
||||
// This ensures intermediate deletions (e.g., 100→0→100) are captured.
|
||||
// We intentionally do NOT call update_with_state() here because it would
|
||||
// clear removal flags via remove_removed() when seeing non-zero values.
|
||||
// Update the persistent removed keys state based on the MERGED state update.
|
||||
// This tracks the "current" removal state for subsequent batches.
|
||||
self.multi_added_removed_keys.update_with_state(&hashed_state_update);
|
||||
|
||||
// For proof selection within THIS batch, we need to include any keys that were
|
||||
// deleted mid-batch but recreated (e.g., 100→0→100). The merged state shows
|
||||
// the final value, but update_with_state clears the removal flag when it sees
|
||||
// non-zero values. So we apply the batch_removed_keys to override those clears.
|
||||
let mut proof_added_removed_keys = self.multi_added_removed_keys.clone();
|
||||
if !batch_removed_keys.is_empty() {
|
||||
batch_removed_keys.apply_to(&mut proof_added_removed_keys);
|
||||
}
|
||||
|
||||
// Split the state update into already fetched and not fetched according to the proof
|
||||
// targets.
|
||||
let (fetched_state_update, not_fetched_state_update) = hashed_state_update
|
||||
.partition_by_targets(&self.fetched_proof_targets, &self.multi_added_removed_keys);
|
||||
.partition_by_targets(&self.fetched_proof_targets, &proof_added_removed_keys);
|
||||
|
||||
let mut state_updates = 0;
|
||||
// If there are any accounts or storage slots that we already fetched the proofs for,
|
||||
@@ -909,7 +921,7 @@ impl MultiProofTask {
|
||||
}
|
||||
|
||||
// Clone+Arc MultiAddedRemovedKeys for sharing with the dispatched multiproof tasks
|
||||
let multi_added_removed_keys = Arc::new(self.multi_added_removed_keys.clone());
|
||||
let multi_added_removed_keys = Arc::new(proof_added_removed_keys);
|
||||
|
||||
let chunking_len = not_fetched_state_update.chunking_length();
|
||||
let mut spawned_proof_targets = MultiProofTargets::default();
|
||||
@@ -1104,25 +1116,14 @@ impl MultiProofTask {
|
||||
self.metrics.state_update_batch_size_histogram.record(num_batched as f64);
|
||||
|
||||
// Merge all accumulated updates into a single EvmState payload while
|
||||
// recording removals BEFORE extend() can overwrite them.
|
||||
// Use drain to preserve the buffer allocation.
|
||||
let mut accumulated_iter = ctx.accumulated_state_updates.drain(..);
|
||||
let (batch_source, mut merged_update) = accumulated_iter
|
||||
.next()
|
||||
.expect("state update batch always has at least one entry");
|
||||
|
||||
// Record removals from the first update
|
||||
self.multi_added_removed_keys.record_removals(&merged_update);
|
||||
|
||||
for (_, next_update) in accumulated_iter {
|
||||
// Record removals BEFORE extend() overwrites them
|
||||
self.multi_added_removed_keys.record_removals(&next_update);
|
||||
merged_update.extend(next_update);
|
||||
}
|
||||
// preserving deletion information. Use drain to preserve buffer allocation.
|
||||
let (batch_source, merged_update, batch_removed_keys) =
|
||||
batch_state_updates(ctx.accumulated_state_updates.drain(..))
|
||||
.expect("state update batch always has at least one entry");
|
||||
|
||||
let batch_len = merged_update.len();
|
||||
batch_metrics.state_update_proofs_requested +=
|
||||
self.on_state_update(batch_source, merged_update);
|
||||
self.on_state_update(batch_source, merged_update, &batch_removed_keys);
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor::multiproof",
|
||||
?batch_source,
|
||||
@@ -1407,7 +1408,9 @@ fn get_proof_targets(
|
||||
|
||||
// first collect all new accounts (not previously fetched)
|
||||
for &hashed_address in state_update.accounts.keys() {
|
||||
if !fetched_proof_targets.contains_key(&hashed_address) {
|
||||
if !fetched_proof_targets.contains_key(&hashed_address) ||
|
||||
multi_added_removed_keys.get_accounts().is_removed(&hashed_address)
|
||||
{
|
||||
targets.insert(hashed_address, HashSet::default());
|
||||
}
|
||||
}
|
||||
@@ -1438,6 +1441,81 @@ fn get_proof_targets(
|
||||
targets
|
||||
}
|
||||
|
||||
/// Tracks removals across batched state updates so deletion signals are not lost when updates are
|
||||
/// merged.
|
||||
#[derive(Default, Debug)]
|
||||
struct BatchedRemovedKeys {
|
||||
accounts: HashSet<B256>,
|
||||
storages: B256Map<HashSet<B256>>,
|
||||
}
|
||||
|
||||
impl BatchedRemovedKeys {
|
||||
/// Records any account or storage removals observed in the given state update.
|
||||
fn record(&mut self, update: &EvmState) {
|
||||
for (address, account) in update {
|
||||
if !account.is_touched() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let hashed_address = keccak256(*address);
|
||||
|
||||
// Selfdestruct clears storage, so force a refetch of the account proof.
|
||||
if account.is_selfdestructed() {
|
||||
self.accounts.insert(hashed_address);
|
||||
continue;
|
||||
}
|
||||
|
||||
for (slot, value) in &account.storage {
|
||||
if value.is_changed() && value.present_value.is_zero() {
|
||||
self.storages
|
||||
.entry(hashed_address)
|
||||
.or_default()
|
||||
.insert(keccak256(B256::from(*slot)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Applies recorded removals to the given [`MultiAddedRemovedKeys`].
|
||||
fn apply_to(&self, added_removed_keys: &mut MultiAddedRemovedKeys) {
|
||||
for account in &self.accounts {
|
||||
added_removed_keys.mark_account_removed(*account);
|
||||
}
|
||||
|
||||
for (hashed_address, slots) in &self.storages {
|
||||
for slot in slots {
|
||||
added_removed_keys.mark_storage_removed(*hashed_address, *slot);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if no removals were recorded.
|
||||
fn is_empty(&self) -> bool {
|
||||
self.accounts.is_empty() && self.storages.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
/// Merges multiple state updates while preserving deletion information.
|
||||
///
|
||||
/// When `extend()` merges updates, intermediate deletions (values set to zero) are overwritten
|
||||
/// by later non-zero values. This function captures those deletions before they're lost,
|
||||
/// ensuring proofs are correctly refetched for keys that were deleted mid-batch.
|
||||
///
|
||||
/// Returns `(source, merged_state, removed_keys)` or `None` if the iterator is empty.
|
||||
fn batch_state_updates(
|
||||
updates: impl Iterator<Item = (StateChangeSource, EvmState)>,
|
||||
) -> Option<(StateChangeSource, EvmState, BatchedRemovedKeys)> {
|
||||
let mut iter = updates;
|
||||
let (source, mut merged) = iter.next()?;
|
||||
let mut removed = BatchedRemovedKeys::default();
|
||||
removed.record(&merged);
|
||||
for (_, next) in iter {
|
||||
removed.record(&next);
|
||||
merged.extend(next);
|
||||
}
|
||||
Some((source, merged, removed))
|
||||
}
|
||||
|
||||
/// Dispatches work items as a single unit or in chunks based on target size and worker
|
||||
/// availability.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
@@ -1962,6 +2040,158 @@ mod tests {
|
||||
assert!(!targets.contains_key(&addr));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_proof_targets_refetches_removed_account() {
|
||||
let mut state = HashedPostState::default();
|
||||
let mut fetched = MultiProofTargets::default();
|
||||
let mut multi_added_removed_keys = MultiAddedRemovedKeys::new();
|
||||
|
||||
let addr = B256::random();
|
||||
|
||||
state.accounts.insert(addr, Some(Default::default()));
|
||||
fetched.insert(addr, HashSet::default());
|
||||
multi_added_removed_keys.mark_account_removed(addr);
|
||||
|
||||
let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys);
|
||||
|
||||
assert!(targets.contains_key(&addr));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_batched_removed_keys_preserve_hidden_deletions() {
|
||||
use alloy_primitives::Address;
|
||||
use revm_state::{Account, AccountInfo, AccountStatus, EvmStorageSlot};
|
||||
|
||||
let mut removed_keys = BatchedRemovedKeys::default();
|
||||
let address = Address::random();
|
||||
let slot = U256::from(1);
|
||||
|
||||
// Record a deletion in an intermediate update.
|
||||
let mut delete_update = EvmState::default();
|
||||
delete_update.insert(
|
||||
address,
|
||||
Account {
|
||||
info: AccountInfo::default(),
|
||||
transaction_id: 0,
|
||||
storage: std::iter::once((
|
||||
slot,
|
||||
EvmStorageSlot::new_changed(U256::from(10), U256::ZERO, 0),
|
||||
))
|
||||
.collect(),
|
||||
status: AccountStatus::Touched,
|
||||
},
|
||||
);
|
||||
removed_keys.record(&delete_update);
|
||||
|
||||
// Final merged state re-adds the slot with a non-zero value.
|
||||
let mut state = HashedPostState::default();
|
||||
let hashed_address = keccak256(address);
|
||||
let hashed_slot = keccak256(B256::from(slot));
|
||||
let mut storage = HashedStorage::default();
|
||||
storage.storage.insert(hashed_slot, U256::from(20));
|
||||
state.accounts.insert(hashed_address, Some(Default::default()));
|
||||
state.storages.insert(hashed_address, storage);
|
||||
|
||||
// Slot was already fetched previously.
|
||||
let mut fetched = MultiProofTargets::default();
|
||||
fetched.insert(hashed_address, HashSet::from_iter([hashed_slot]));
|
||||
|
||||
// Apply recorded deletions so proof selection still refetches.
|
||||
let mut multi_added_removed_keys = MultiAddedRemovedKeys::new();
|
||||
removed_keys.apply_to(&mut multi_added_removed_keys);
|
||||
|
||||
let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys);
|
||||
assert!(targets.get(&hashed_address).is_some_and(|slots| slots.contains(&hashed_slot)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_batch_state_updates_preserves_intermediate_deletions() {
|
||||
use alloy_primitives::Address;
|
||||
use revm_state::{Account, AccountInfo, AccountStatus, EvmStorageSlot};
|
||||
|
||||
let address = Address::random();
|
||||
let slot = U256::from(42);
|
||||
|
||||
// Update 1: Create slot with value 100
|
||||
let mut update1 = EvmState::default();
|
||||
update1.insert(
|
||||
address,
|
||||
Account {
|
||||
info: AccountInfo::default(),
|
||||
transaction_id: 0,
|
||||
storage: std::iter::once((
|
||||
slot,
|
||||
EvmStorageSlot::new_changed(U256::ZERO, U256::from(100), 0),
|
||||
))
|
||||
.collect(),
|
||||
status: AccountStatus::Touched,
|
||||
},
|
||||
);
|
||||
|
||||
// Update 2: Delete slot (set to 0)
|
||||
let mut update2 = EvmState::default();
|
||||
update2.insert(
|
||||
address,
|
||||
Account {
|
||||
info: AccountInfo::default(),
|
||||
transaction_id: 1,
|
||||
storage: std::iter::once((
|
||||
slot,
|
||||
EvmStorageSlot::new_changed(U256::from(100), U256::ZERO, 1),
|
||||
))
|
||||
.collect(),
|
||||
status: AccountStatus::Touched,
|
||||
},
|
||||
);
|
||||
|
||||
// Update 3: Recreate slot with value 200
|
||||
let mut update3 = EvmState::default();
|
||||
update3.insert(
|
||||
address,
|
||||
Account {
|
||||
info: AccountInfo::default(),
|
||||
transaction_id: 2,
|
||||
storage: std::iter::once((
|
||||
slot,
|
||||
EvmStorageSlot::new_changed(U256::ZERO, U256::from(200), 2),
|
||||
))
|
||||
.collect(),
|
||||
status: AccountStatus::Touched,
|
||||
},
|
||||
);
|
||||
|
||||
let source = StateChangeSource::Transaction(0);
|
||||
let updates = vec![
|
||||
(source, update1),
|
||||
(StateChangeSource::Transaction(1), update2),
|
||||
(StateChangeSource::Transaction(2), update3),
|
||||
];
|
||||
|
||||
let (result_source, merged, removed_keys) =
|
||||
batch_state_updates(updates.into_iter()).expect("should have updates");
|
||||
|
||||
// Source should be from first update
|
||||
assert!(matches!(result_source, StateChangeSource::Transaction(0)));
|
||||
|
||||
// Merged state should have final value 200
|
||||
let account = merged.get(&address).expect("account should exist");
|
||||
let slot_value = account.storage.get(&slot).expect("slot should exist");
|
||||
assert_eq!(slot_value.present_value, U256::from(200));
|
||||
|
||||
// Crucially: removal should be recorded even though final value is non-zero
|
||||
let hashed_address = keccak256(address);
|
||||
let hashed_slot = keccak256(B256::from(slot));
|
||||
|
||||
let mut added_removed_keys = MultiAddedRemovedKeys::new();
|
||||
removed_keys.apply_to(&mut added_removed_keys);
|
||||
|
||||
let storage_keys = added_removed_keys.get_storage(&hashed_address);
|
||||
assert!(
|
||||
storage_keys.is_some_and(|k| k.is_removed(&hashed_slot)),
|
||||
"slot should be marked as removed despite final non-zero value"
|
||||
);
|
||||
}
|
||||
|
||||
/// Verifies that consecutive prefetch proof messages are batched together.
|
||||
#[test]
|
||||
fn test_prefetch_proofs_batching() {
|
||||
@@ -2079,7 +2309,7 @@ mod tests {
|
||||
assert!(merged_update.contains_key(&addr1));
|
||||
assert!(merged_update.contains_key(&addr2));
|
||||
|
||||
task.on_state_update(source, merged_update)
|
||||
task.on_state_update(source, merged_update, &BatchedRemovedKeys::default())
|
||||
} else {
|
||||
panic!("Expected StateUpdate message");
|
||||
};
|
||||
|
||||
@@ -1,9 +1,8 @@
|
||||
//! Tracking of keys having been added and removed from the tries.
|
||||
|
||||
use crate::HashedPostState;
|
||||
use alloy_primitives::{keccak256, map::B256Map, B256};
|
||||
use alloy_primitives::{map::B256Map, B256};
|
||||
use alloy_trie::proof::AddedRemovedKeys;
|
||||
use revm_state::EvmState;
|
||||
|
||||
/// Tracks added and removed keys across account and storage tries.
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -85,40 +84,17 @@ impl MultiAddedRemovedKeys {
|
||||
}
|
||||
}
|
||||
|
||||
/// Records removals from an EVM state update.
|
||||
///
|
||||
/// Unlike [`Self::update_with_state`], this treats removals as monotonic -
|
||||
/// once a key is marked removed, it stays removed. This is correct for
|
||||
/// intra-block proof invalidation where branch proofs become stale on
|
||||
/// deletion regardless of later recreation.
|
||||
///
|
||||
/// Call this on each sub-update BEFORE merging with `extend()` to ensure
|
||||
/// intermediate deletions are captured.
|
||||
pub fn record_removals(&mut self, update: &EvmState) {
|
||||
for (address, account) in update {
|
||||
if !account.is_touched() {
|
||||
continue;
|
||||
}
|
||||
/// Marks an account as removed.
|
||||
pub fn mark_account_removed(&mut self, account: B256) {
|
||||
self.account.insert_removed(account);
|
||||
}
|
||||
|
||||
let hashed_address = keccak256(*address);
|
||||
|
||||
// Selfdestruct wipes storage - mark account as removed
|
||||
if account.is_selfdestructed() {
|
||||
self.account.insert_removed(hashed_address);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Track storage slots being deleted (set to zero)
|
||||
for (slot, value) in &account.storage {
|
||||
if value.is_changed() && value.present_value.is_zero() {
|
||||
self.storages
|
||||
.entry(hashed_address)
|
||||
.or_insert_with(default_added_removed_keys)
|
||||
.insert_removed(keccak256(B256::from(*slot)));
|
||||
}
|
||||
}
|
||||
// NOTE: No remove_removed calls - removals are monotonic within a block
|
||||
}
|
||||
/// Marks a storage slot as removed for the given account.
|
||||
pub fn mark_storage_removed(&mut self, hashed_address: B256, slot: B256) {
|
||||
self.storages
|
||||
.entry(hashed_address)
|
||||
.or_insert_with(default_added_removed_keys)
|
||||
.insert_removed(slot);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user