Compare commits

...

8 Commits

Author SHA1 Message Date
yongkangc
7e80c3cac2 merrge 2025-12-15 05:05:50 +00:00
yongkangc
6e8d62617d fix(multiproof): sr mismatvch
- Updated the handling of state updates to ensure that removals are recorded before merging updates, preserving the integrity of intermediate deletions.
- Introduced a new method, `record_removals`, to track monotonic removals from EVM state updates, ensuring that once a key is marked as removed, it remains so for proof invalidation.
- Improved comments for clarity on the removal tracking process and its implications for state updates.
2025-12-15 02:52:33 +00:00
yongkangc
866b8ded5f refactor(multiproof): streamline state update message handling
- Consolidated the sending of MultiProofMessage::StateUpdate to improve readability.
- Updated comments for clarity regarding the batch source logic in tests, ensuring it reflects the expected behavior of the PreBlock source.
2025-12-10 09:41:23 +00:00
yongkangc
51985e249c refactor(multiproof): batch consecutive
- Simplified the batching logic for state updates by removing unnecessary checks and consolidating the handling of different source types.
- Updated tests to verify that state updates from various sources can be batched together correctly while respecting the target limits.
- Improved clarity and maintainability of the code by refining comments and restructuring the logic for merging updates.
2025-12-10 09:41:23 +00:00
yongkangc
5ac911b707 refactor(engine): extract multiproof batch context into structs
Extract &mut parameters from process_multiproof_message into:
- MultiproofBatchCtx: core processing state (pending_msg, timing, updates_finished)
- MultiproofBatchMetrics: counters for proofs processed/requested

This improves code organization and reduces function parameter count.
2025-12-10 09:41:23 +00:00
yongkangc
e9a5a11a9f fix(engine): rename outcome to num_chunks for clarity
Addresses reviewer nit: the variable returned from dispatch_with_chunking
represents number of chunks, so the name should reflect that.
2025-12-10 09:41:23 +00:00
yongkangc
7dd14651e4 revert comparison 2025-12-10 09:41:22 +00:00
yongkangc
51ef406b94 Add bench compare latency stats 2025-12-10 09:38:03 +00:00
3 changed files with 710 additions and 257 deletions

View File

@@ -3,7 +3,7 @@
use alloy_evm::block::StateChangeSource;
use alloy_primitives::{
keccak256,
map::{B256Set, HashSet},
map::{B256Map, B256Set, HashSet},
B256,
};
use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
@@ -23,7 +23,7 @@ use reth_trie_parallel::{
StorageProofInput,
},
};
use std::{collections::BTreeMap, mem, ops::DerefMut, sync::Arc, time::Instant};
use std::{collections::BTreeMap, ops::DerefMut, sync::Arc, time::Instant};
use tracing::{debug, error, instrument, trace};
/// Maximum number of targets to batch together for prefetch batching.
@@ -883,16 +883,31 @@ impl MultiProofTask {
skip(self, update),
fields(accounts = update.len(), chunks = 0)
)]
fn on_state_update(&mut self, source: StateChangeSource, update: EvmState) -> u64 {
fn on_state_update(
&mut self,
source: StateChangeSource,
update: EvmState,
batch_removed_keys: &BatchedRemovedKeys,
) -> u64 {
let hashed_state_update = evm_state_to_hashed_post_state(update);
// Update removed keys based on the state update.
// Update the persistent removed keys state based on the MERGED state update.
// This tracks the "current" removal state for subsequent batches.
self.multi_added_removed_keys.update_with_state(&hashed_state_update);
// For proof selection within THIS batch, we need to include any keys that were
// deleted mid-batch but recreated (e.g., 100→0→100). The merged state shows
// the final value, but update_with_state clears the removal flag when it sees
// non-zero values. So we apply the batch_removed_keys to override those clears.
let mut proof_added_removed_keys = self.multi_added_removed_keys.clone();
if !batch_removed_keys.is_empty() {
batch_removed_keys.apply_to(&mut proof_added_removed_keys);
}
// Split the state update into already fetched and not fetched according to the proof
// targets.
let (fetched_state_update, not_fetched_state_update) = hashed_state_update
.partition_by_targets(&self.fetched_proof_targets, &self.multi_added_removed_keys);
.partition_by_targets(&self.fetched_proof_targets, &proof_added_removed_keys);
let mut state_updates = 0;
// If there are any accounts or storage slots that we already fetched the proofs for,
@@ -906,7 +921,7 @@ impl MultiProofTask {
}
// Clone+Arc MultiAddedRemovedKeys for sharing with the dispatched multiproof tasks
let multi_added_removed_keys = Arc::new(self.multi_added_removed_keys.clone());
let multi_added_removed_keys = Arc::new(proof_added_removed_keys);
let chunking_len = not_fetched_state_update.chunking_length();
let mut spawned_proof_targets = MultiProofTargets::default();
@@ -1056,7 +1071,7 @@ impl MultiProofTask {
false
}
// State update: batch consecutive updates from the same source
// State update: batch consecutive updates up to the target cap
MultiProofMessage::StateUpdate(source, update) => {
trace!(target: "engine::tree::payload_processor::multiproof", "processing MultiProofMessage::StateUpdate");
@@ -1077,18 +1092,6 @@ impl MultiProofTask {
while accumulated_targets < STATE_UPDATE_MAX_BATCH_TARGETS {
match self.rx.try_recv() {
Ok(MultiProofMessage::StateUpdate(next_source, next_update)) => {
let (batch_source, batch_update) = &ctx.accumulated_state_updates[0];
if !can_batch_state_update(
*batch_source,
batch_update,
next_source,
&next_update,
) {
ctx.pending_msg =
Some(MultiProofMessage::StateUpdate(next_source, next_update));
break;
}
let next_estimate = estimate_evm_state_targets(&next_update);
// Would exceed batch cap; leave pending to dispatch on next iteration.
if accumulated_targets + next_estimate > STATE_UPDATE_MAX_BATCH_TARGETS
@@ -1112,29 +1115,15 @@ impl MultiProofTask {
let num_batched = ctx.accumulated_state_updates.len();
self.metrics.state_update_batch_size_histogram.record(num_batched as f64);
#[cfg(debug_assertions)]
{
let batch_source = ctx.accumulated_state_updates[0].0;
let batch_update = &ctx.accumulated_state_updates[0].1;
debug_assert!(ctx.accumulated_state_updates.iter().all(|(source, update)| {
can_batch_state_update(batch_source, batch_update, *source, update)
}));
}
// Merge all accumulated updates into a single EvmState payload.
// Use drain to preserve the buffer allocation.
let mut accumulated_iter = ctx.accumulated_state_updates.drain(..);
let (mut batch_source, mut merged_update) = accumulated_iter
.next()
.expect("state update batch always has at least one entry");
for (next_source, next_update) in accumulated_iter {
batch_source = next_source;
merged_update.extend(next_update);
}
// Merge all accumulated updates into a single EvmState payload while
// preserving deletion information. Use drain to preserve buffer allocation.
let (batch_source, merged_update, batch_removed_keys) =
batch_state_updates(ctx.accumulated_state_updates.drain(..))
.expect("state update batch always has at least one entry");
let batch_len = merged_update.len();
batch_metrics.state_update_proofs_requested +=
self.on_state_update(batch_source, merged_update);
self.on_state_update(batch_source, merged_update, &batch_removed_keys);
debug!(
target: "engine::tree::payload_processor::multiproof",
?batch_source,
@@ -1419,7 +1408,9 @@ fn get_proof_targets(
// first collect all new accounts (not previously fetched)
for &hashed_address in state_update.accounts.keys() {
if !fetched_proof_targets.contains_key(&hashed_address) {
if !fetched_proof_targets.contains_key(&hashed_address) ||
multi_added_removed_keys.get_accounts().is_removed(&hashed_address)
{
targets.insert(hashed_address, HashSet::default());
}
}
@@ -1450,6 +1441,81 @@ fn get_proof_targets(
targets
}
/// Tracks removals across batched state updates so deletion signals are not lost when updates are
/// merged.
#[derive(Default, Debug)]
struct BatchedRemovedKeys {
accounts: HashSet<B256>,
storages: B256Map<HashSet<B256>>,
}
impl BatchedRemovedKeys {
/// Records any account or storage removals observed in the given state update.
fn record(&mut self, update: &EvmState) {
for (address, account) in update {
if !account.is_touched() {
continue;
}
let hashed_address = keccak256(*address);
// Selfdestruct clears storage, so force a refetch of the account proof.
if account.is_selfdestructed() {
self.accounts.insert(hashed_address);
continue;
}
for (slot, value) in &account.storage {
if value.is_changed() && value.present_value.is_zero() {
self.storages
.entry(hashed_address)
.or_default()
.insert(keccak256(B256::from(*slot)));
}
}
}
}
/// Applies recorded removals to the given [`MultiAddedRemovedKeys`].
fn apply_to(&self, added_removed_keys: &mut MultiAddedRemovedKeys) {
for account in &self.accounts {
added_removed_keys.mark_account_removed(*account);
}
for (hashed_address, slots) in &self.storages {
for slot in slots {
added_removed_keys.mark_storage_removed(*hashed_address, *slot);
}
}
}
/// Returns true if no removals were recorded.
fn is_empty(&self) -> bool {
self.accounts.is_empty() && self.storages.is_empty()
}
}
/// Merges multiple state updates while preserving deletion information.
///
/// When `extend()` merges updates, intermediate deletions (values set to zero) are overwritten
/// by later non-zero values. This function captures those deletions before they're lost,
/// ensuring proofs are correctly refetched for keys that were deleted mid-batch.
///
/// Returns `(source, merged_state, removed_keys)` or `None` if the iterator is empty.
fn batch_state_updates(
updates: impl Iterator<Item = (StateChangeSource, EvmState)>,
) -> Option<(StateChangeSource, EvmState, BatchedRemovedKeys)> {
let mut iter = updates;
let (source, mut merged) = iter.next()?;
let mut removed = BatchedRemovedKeys::default();
removed.record(&merged);
for (_, next) in iter {
removed.record(&next);
merged.extend(next);
}
Some((source, merged, removed))
}
/// Dispatches work items as a single unit or in chunks based on target size and worker
/// availability.
#[allow(clippy::too_many_arguments)]
@@ -1486,44 +1552,6 @@ where
1
}
/// Checks whether two state updates can be merged in a batch.
///
/// Transaction updates with the same transaction ID (`StateChangeSource::Transaction(id)`)
/// are safe to merge because they originate from the same logical execution and can be
/// coalesced to amortize proof work.
fn can_batch_state_update(
batch_source: StateChangeSource,
batch_update: &EvmState,
next_source: StateChangeSource,
next_update: &EvmState,
) -> bool {
if !same_state_change_source(batch_source, next_source) {
return false;
}
match (batch_source, next_source) {
(StateChangeSource::PreBlock(_), StateChangeSource::PreBlock(_)) |
(StateChangeSource::PostBlock(_), StateChangeSource::PostBlock(_)) => {
batch_update == next_update
}
_ => true,
}
}
/// Checks whether two state change sources refer to the same origin.
fn same_state_change_source(lhs: StateChangeSource, rhs: StateChangeSource) -> bool {
match (lhs, rhs) {
(StateChangeSource::Transaction(a), StateChangeSource::Transaction(b)) => a == b,
(StateChangeSource::PreBlock(a), StateChangeSource::PreBlock(b)) => {
mem::discriminant(&a) == mem::discriminant(&b)
}
(StateChangeSource::PostBlock(a), StateChangeSource::PostBlock(b)) => {
mem::discriminant(&a) == mem::discriminant(&b)
}
_ => false,
}
}
/// Estimates target count from `EvmState` for batching decisions.
fn estimate_evm_state_targets(state: &EvmState) -> usize {
state
@@ -2012,6 +2040,158 @@ mod tests {
assert!(!targets.contains_key(&addr));
}
#[test]
fn test_get_proof_targets_refetches_removed_account() {
let mut state = HashedPostState::default();
let mut fetched = MultiProofTargets::default();
let mut multi_added_removed_keys = MultiAddedRemovedKeys::new();
let addr = B256::random();
state.accounts.insert(addr, Some(Default::default()));
fetched.insert(addr, HashSet::default());
multi_added_removed_keys.mark_account_removed(addr);
let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys);
assert!(targets.contains_key(&addr));
}
#[test]
fn test_batched_removed_keys_preserve_hidden_deletions() {
use alloy_primitives::Address;
use revm_state::{Account, AccountInfo, AccountStatus, EvmStorageSlot};
let mut removed_keys = BatchedRemovedKeys::default();
let address = Address::random();
let slot = U256::from(1);
// Record a deletion in an intermediate update.
let mut delete_update = EvmState::default();
delete_update.insert(
address,
Account {
info: AccountInfo::default(),
transaction_id: 0,
storage: std::iter::once((
slot,
EvmStorageSlot::new_changed(U256::from(10), U256::ZERO, 0),
))
.collect(),
status: AccountStatus::Touched,
},
);
removed_keys.record(&delete_update);
// Final merged state re-adds the slot with a non-zero value.
let mut state = HashedPostState::default();
let hashed_address = keccak256(address);
let hashed_slot = keccak256(B256::from(slot));
let mut storage = HashedStorage::default();
storage.storage.insert(hashed_slot, U256::from(20));
state.accounts.insert(hashed_address, Some(Default::default()));
state.storages.insert(hashed_address, storage);
// Slot was already fetched previously.
let mut fetched = MultiProofTargets::default();
fetched.insert(hashed_address, HashSet::from_iter([hashed_slot]));
// Apply recorded deletions so proof selection still refetches.
let mut multi_added_removed_keys = MultiAddedRemovedKeys::new();
removed_keys.apply_to(&mut multi_added_removed_keys);
let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys);
assert!(targets.get(&hashed_address).is_some_and(|slots| slots.contains(&hashed_slot)));
}
#[test]
fn test_batch_state_updates_preserves_intermediate_deletions() {
use alloy_primitives::Address;
use revm_state::{Account, AccountInfo, AccountStatus, EvmStorageSlot};
let address = Address::random();
let slot = U256::from(42);
// Update 1: Create slot with value 100
let mut update1 = EvmState::default();
update1.insert(
address,
Account {
info: AccountInfo::default(),
transaction_id: 0,
storage: std::iter::once((
slot,
EvmStorageSlot::new_changed(U256::ZERO, U256::from(100), 0),
))
.collect(),
status: AccountStatus::Touched,
},
);
// Update 2: Delete slot (set to 0)
let mut update2 = EvmState::default();
update2.insert(
address,
Account {
info: AccountInfo::default(),
transaction_id: 1,
storage: std::iter::once((
slot,
EvmStorageSlot::new_changed(U256::from(100), U256::ZERO, 1),
))
.collect(),
status: AccountStatus::Touched,
},
);
// Update 3: Recreate slot with value 200
let mut update3 = EvmState::default();
update3.insert(
address,
Account {
info: AccountInfo::default(),
transaction_id: 2,
storage: std::iter::once((
slot,
EvmStorageSlot::new_changed(U256::ZERO, U256::from(200), 2),
))
.collect(),
status: AccountStatus::Touched,
},
);
let source = StateChangeSource::Transaction(0);
let updates = vec![
(source, update1),
(StateChangeSource::Transaction(1), update2),
(StateChangeSource::Transaction(2), update3),
];
let (result_source, merged, removed_keys) =
batch_state_updates(updates.into_iter()).expect("should have updates");
// Source should be from first update
assert!(matches!(result_source, StateChangeSource::Transaction(0)));
// Merged state should have final value 200
let account = merged.get(&address).expect("account should exist");
let slot_value = account.storage.get(&slot).expect("slot should exist");
assert_eq!(slot_value.present_value, U256::from(200));
// Crucially: removal should be recorded even though final value is non-zero
let hashed_address = keccak256(address);
let hashed_slot = keccak256(B256::from(slot));
let mut added_removed_keys = MultiAddedRemovedKeys::new();
removed_keys.apply_to(&mut added_removed_keys);
let storage_keys = added_removed_keys.get_storage(&hashed_address);
assert!(
storage_keys.is_some_and(|k| k.is_removed(&hashed_slot)),
"slot should be marked as removed despite final non-zero value"
);
}
/// Verifies that consecutive prefetch proof messages are batched together.
#[test]
fn test_prefetch_proofs_batching() {
@@ -2129,142 +2309,16 @@ mod tests {
assert!(merged_update.contains_key(&addr1));
assert!(merged_update.contains_key(&addr2));
task.on_state_update(source, merged_update)
task.on_state_update(source, merged_update, &BatchedRemovedKeys::default())
} else {
panic!("Expected StateUpdate message");
};
assert_eq!(proofs_requested, 1);
}
/// Verifies that state updates from different sources are not batched together.
/// Verifies that state updates from different sources batch together.
#[test]
fn test_state_update_batching_separates_sources() {
use alloy_evm::block::StateChangeSource;
use revm_state::Account;
let test_provider_factory = create_test_provider_factory();
let task = create_test_state_root_task(test_provider_factory);
let addr_a1 = alloy_primitives::Address::random();
let addr_b1 = alloy_primitives::Address::random();
let addr_a2 = alloy_primitives::Address::random();
let create_state_update = |addr: alloy_primitives::Address, balance: u64| {
let mut state = EvmState::default();
state.insert(
addr,
Account {
info: revm_state::AccountInfo {
balance: U256::from(balance),
nonce: 1,
code_hash: Default::default(),
code: Default::default(),
},
transaction_id: Default::default(),
storage: Default::default(),
status: revm_state::AccountStatus::Touched,
},
);
state
};
let source_a = StateChangeSource::Transaction(1);
let source_b = StateChangeSource::Transaction(2);
// Queue: A1 (immediate dispatch), B1 (batched), A2 (should become pending)
let tx = task.state_root_message_sender();
tx.send(MultiProofMessage::StateUpdate(source_a, create_state_update(addr_a1, 100)))
.unwrap();
tx.send(MultiProofMessage::StateUpdate(source_b, create_state_update(addr_b1, 200)))
.unwrap();
tx.send(MultiProofMessage::StateUpdate(source_a, create_state_update(addr_a2, 300)))
.unwrap();
let mut pending_msg: Option<MultiProofMessage> = None;
if let Ok(MultiProofMessage::StateUpdate(first_source, _)) = task.rx.recv() {
assert!(same_state_change_source(first_source, source_a));
// Simulate batching loop for remaining messages
let mut accumulated_updates: Vec<(StateChangeSource, EvmState)> = Vec::new();
let mut accumulated_targets = 0usize;
loop {
if accumulated_targets >= STATE_UPDATE_MAX_BATCH_TARGETS {
break;
}
match task.rx.try_recv() {
Ok(MultiProofMessage::StateUpdate(next_source, next_update)) => {
if let Some((batch_source, batch_update)) = accumulated_updates.first() &&
!can_batch_state_update(
*batch_source,
batch_update,
next_source,
&next_update,
)
{
pending_msg =
Some(MultiProofMessage::StateUpdate(next_source, next_update));
break;
}
let next_estimate = estimate_evm_state_targets(&next_update);
if next_estimate > STATE_UPDATE_MAX_BATCH_TARGETS {
pending_msg =
Some(MultiProofMessage::StateUpdate(next_source, next_update));
break;
}
if accumulated_targets + next_estimate > STATE_UPDATE_MAX_BATCH_TARGETS &&
!accumulated_updates.is_empty()
{
pending_msg =
Some(MultiProofMessage::StateUpdate(next_source, next_update));
break;
}
accumulated_targets += next_estimate;
accumulated_updates.push((next_source, next_update));
}
Ok(other_msg) => {
pending_msg = Some(other_msg);
break;
}
Err(_) => break,
}
}
assert_eq!(accumulated_updates.len(), 1, "Should only batch matching sources");
let batch_source = accumulated_updates[0].0;
assert!(same_state_change_source(batch_source, source_b));
let batch_source = accumulated_updates[0].0;
let mut merged_update = accumulated_updates.remove(0).1;
for (_, next_update) in accumulated_updates {
merged_update.extend(next_update);
}
assert!(
same_state_change_source(batch_source, source_b),
"Batch should use matching source"
);
assert!(merged_update.contains_key(&addr_b1));
assert!(!merged_update.contains_key(&addr_a1));
assert!(!merged_update.contains_key(&addr_a2));
} else {
panic!("Expected first StateUpdate");
}
match pending_msg {
Some(MultiProofMessage::StateUpdate(pending_source, pending_update)) => {
assert!(same_state_change_source(pending_source, source_a));
assert!(pending_update.contains_key(&addr_a2));
}
other => panic!("Expected pending StateUpdate with source_a, got {:?}", other),
}
}
/// Verifies that pre-block updates only batch when their payloads are identical.
#[test]
fn test_pre_block_updates_require_payload_match_to_batch() {
fn test_different_sources_batch_together() {
use alloy_evm::block::{StateChangePreBlockSource, StateChangeSource};
use revm_state::Account;
@@ -2294,51 +2348,192 @@ mod tests {
state
};
let source = StateChangeSource::PreBlock(StateChangePreBlockSource::BeaconRootContract);
// Different source types
let source_tx1 = StateChangeSource::Transaction(1);
let source_tx2 = StateChangeSource::Transaction(2);
let source_preblock =
StateChangeSource::PreBlock(StateChangePreBlockSource::BeaconRootContract);
// Queue: first update dispatched immediately, next two should not merge
// Queue updates from different sources
let tx = task.state_root_message_sender();
tx.send(MultiProofMessage::StateUpdate(source, create_state_update(addr1, 100))).unwrap();
tx.send(MultiProofMessage::StateUpdate(source, create_state_update(addr2, 200))).unwrap();
tx.send(MultiProofMessage::StateUpdate(source, create_state_update(addr3, 300))).unwrap();
tx.send(MultiProofMessage::StateUpdate(source_tx1, create_state_update(addr1, 100)))
.unwrap();
tx.send(MultiProofMessage::StateUpdate(source_tx2, create_state_update(addr2, 200)))
.unwrap();
tx.send(MultiProofMessage::StateUpdate(source_preblock, create_state_update(addr3, 300)))
.unwrap();
// Receive and batch all updates
if let Ok(MultiProofMessage::StateUpdate(_, first_update)) = task.rx.recv() {
let mut merged_update = first_update;
let mut num_batched = 1;
while let Ok(MultiProofMessage::StateUpdate(_, next_update)) = task.rx.try_recv() {
merged_update.extend(next_update);
num_batched += 1;
}
// All three updates should batch together regardless of source type
assert_eq!(num_batched, 3, "All updates from different sources should batch together");
assert_eq!(merged_update.len(), 3);
assert!(merged_update.contains_key(&addr1));
assert!(merged_update.contains_key(&addr2));
assert!(merged_update.contains_key(&addr3));
} else {
panic!("Expected StateUpdate message");
}
}
/// Verifies that cross-source batching preserves all state data correctly.
#[test]
fn test_cross_source_batching_preserves_all_state() {
use alloy_evm::block::{
StateChangePostBlockSource, StateChangePreBlockSource, StateChangeSource,
};
use revm_state::Account;
let test_provider_factory = create_test_provider_factory();
let task = create_test_state_root_task(test_provider_factory);
let addr_tx = alloy_primitives::Address::random();
let addr_preblock = alloy_primitives::Address::random();
let addr_postblock = alloy_primitives::Address::random();
let create_state_update = |addr: alloy_primitives::Address, balance: u64, nonce: u64| {
let mut state = EvmState::default();
state.insert(
addr,
Account {
info: revm_state::AccountInfo {
balance: U256::from(balance),
nonce,
code_hash: Default::default(),
code: Default::default(),
},
transaction_id: Default::default(),
storage: Default::default(),
status: revm_state::AccountStatus::Touched,
},
);
state
};
let source_tx = StateChangeSource::Transaction(0);
let source_preblock =
StateChangeSource::PreBlock(StateChangePreBlockSource::BeaconRootContract);
let source_postblock =
StateChangeSource::PostBlock(StateChangePostBlockSource::BalanceIncrements);
// Queue updates from all three source types
let tx = task.state_root_message_sender();
tx.send(MultiProofMessage::StateUpdate(source_tx, create_state_update(addr_tx, 100, 1)))
.unwrap();
tx.send(MultiProofMessage::StateUpdate(
source_preblock,
create_state_update(addr_preblock, 200, 2),
))
.unwrap();
tx.send(MultiProofMessage::StateUpdate(
source_postblock,
create_state_update(addr_postblock, 300, 3),
))
.unwrap();
// Receive and batch all updates
if let Ok(MultiProofMessage::StateUpdate(_, first_update)) = task.rx.recv() {
let mut merged_update = first_update;
while let Ok(MultiProofMessage::StateUpdate(_, next_update)) = task.rx.try_recv() {
merged_update.extend(next_update);
}
// Verify all accounts are present with correct data
assert_eq!(merged_update.len(), 3);
let tx_account = merged_update.get(&addr_tx).expect("Transaction account missing");
assert_eq!(tx_account.info.balance, U256::from(100));
assert_eq!(tx_account.info.nonce, 1);
let preblock_account =
merged_update.get(&addr_preblock).expect("PreBlock account missing");
assert_eq!(preblock_account.info.balance, U256::from(200));
assert_eq!(preblock_account.info.nonce, 2);
let postblock_account =
merged_update.get(&addr_postblock).expect("PostBlock account missing");
assert_eq!(postblock_account.info.balance, U256::from(300));
assert_eq!(postblock_account.info.nonce, 3);
} else {
panic!("Expected StateUpdate message");
}
}
/// Verifies that mixed-source batches keep the first source for attribution/logging.
#[test]
fn test_mixed_source_batch_preserves_first_source() {
use alloy_evm::block::{
StateChangePostBlockSource, StateChangePreBlockSource, StateChangeSource,
};
use revm_state::Account;
let test_provider_factory = create_test_provider_factory();
let task = create_test_state_root_task(test_provider_factory);
let addr_preblock = alloy_primitives::Address::random();
let addr_tx = alloy_primitives::Address::random();
let addr_postblock = alloy_primitives::Address::random();
let create_state_update = |addr: alloy_primitives::Address, balance: u64, nonce: u64| {
let mut state = EvmState::default();
state.insert(
addr,
Account {
info: revm_state::AccountInfo {
balance: U256::from(balance),
nonce,
code_hash: Default::default(),
code: Default::default(),
},
transaction_id: Default::default(),
storage: Default::default(),
status: revm_state::AccountStatus::Touched,
},
);
state
};
let source_preblock =
StateChangeSource::PreBlock(StateChangePreBlockSource::BeaconRootContract);
let source_tx = StateChangeSource::Transaction(123);
let source_postblock =
StateChangeSource::PostBlock(StateChangePostBlockSource::BalanceIncrements);
// Queue mixed sources; ensure we don't hit the batching cap.
let tx = task.state_root_message_sender();
tx.send(MultiProofMessage::StateUpdate(
source_preblock,
create_state_update(addr_preblock, 1, 1),
))
.unwrap();
tx.send(MultiProofMessage::StateUpdate(source_tx, create_state_update(addr_tx, 2, 2)))
.unwrap();
tx.send(MultiProofMessage::StateUpdate(
source_postblock,
create_state_update(addr_postblock, 3, 3),
))
.unwrap();
let mut pending_msg: Option<MultiProofMessage> = None;
if let Ok(MultiProofMessage::StateUpdate(first_source, first_update)) = task.rx.recv() {
assert!(same_state_change_source(first_source, source));
assert!(first_update.contains_key(&addr1));
let mut accumulated_updates = vec![(first_source, first_update)];
let mut accumulated_targets = estimate_evm_state_targets(&accumulated_updates[0].1);
let mut accumulated_updates: Vec<(StateChangeSource, EvmState)> = Vec::new();
let mut accumulated_targets = 0usize;
loop {
if accumulated_targets >= STATE_UPDATE_MAX_BATCH_TARGETS {
break;
}
while accumulated_targets < STATE_UPDATE_MAX_BATCH_TARGETS {
match task.rx.try_recv() {
Ok(MultiProofMessage::StateUpdate(next_source, next_update)) => {
if let Some((batch_source, batch_update)) = accumulated_updates.first() &&
!can_batch_state_update(
*batch_source,
batch_update,
next_source,
&next_update,
)
{
pending_msg =
Some(MultiProofMessage::StateUpdate(next_source, next_update));
break;
}
let next_estimate = estimate_evm_state_targets(&next_update);
if next_estimate > STATE_UPDATE_MAX_BATCH_TARGETS {
pending_msg =
Some(MultiProofMessage::StateUpdate(next_source, next_update));
break;
}
if accumulated_targets + next_estimate > STATE_UPDATE_MAX_BATCH_TARGETS &&
!accumulated_updates.is_empty()
{
if accumulated_targets + next_estimate > STATE_UPDATE_MAX_BATCH_TARGETS {
pending_msg =
Some(MultiProofMessage::StateUpdate(next_source, next_update));
break;
@@ -2354,24 +2549,128 @@ mod tests {
}
}
assert_eq!(
accumulated_updates.len(),
1,
"Second pre-block update should not merge with a different payload"
assert!(
pending_msg.is_none(),
"Mixed sources within cap should batch without leaving pending messages"
);
let (batched_source, batched_update) = accumulated_updates.remove(0);
assert!(same_state_change_source(batched_source, source));
assert!(batched_update.contains_key(&addr2));
assert!(!batched_update.contains_key(&addr3));
assert_eq!(accumulated_updates.len(), 3);
match pending_msg {
Some(MultiProofMessage::StateUpdate(_, pending_update)) => {
assert!(pending_update.contains_key(&addr3));
}
other => panic!("Expected pending third pre-block update, got {:?}", other),
let mut accumulated_iter = accumulated_updates.into_iter();
let (batch_source, mut merged_update) =
accumulated_iter.next().expect("at least one update");
for (_, next_update) in accumulated_iter {
merged_update.extend(next_update);
}
// Batch source should stay the first source that arrived (PreBlock).
assert!(matches!(batch_source, StateChangeSource::PreBlock(_)));
assert_eq!(merged_update.len(), 3);
assert!(merged_update.contains_key(&addr_preblock));
assert!(merged_update.contains_key(&addr_tx));
assert!(merged_update.contains_key(&addr_postblock));
} else {
panic!("Expected first StateUpdate");
panic!("Expected initial StateUpdate");
}
}
/// Verifies that mixed source batching respects the target limit.
#[test]
fn test_mixed_sources_respect_target_limit() {
use alloy_evm::block::{StateChangePreBlockSource, StateChangeSource};
use revm_state::Account;
let test_provider_factory = create_test_provider_factory();
let task = create_test_state_root_task(test_provider_factory);
// Create a helper to generate state updates with many storage slots
let create_large_state_update = |addr: alloy_primitives::Address, num_slots: usize| {
let mut state = EvmState::default();
let mut storage = revm_state::EvmStorage::default();
for i in 0..num_slots {
storage.insert(
U256::from(i),
revm_state::EvmStorageSlot::new_changed(U256::ZERO, U256::from(i), 0),
);
}
state.insert(
addr,
Account {
info: revm_state::AccountInfo {
balance: U256::from(100),
nonce: 1,
code_hash: Default::default(),
code: Default::default(),
},
transaction_id: Default::default(),
storage,
status: revm_state::AccountStatus::Touched,
},
);
state
};
let addr1 = alloy_primitives::Address::random();
let addr2 = alloy_primitives::Address::random();
let addr3 = alloy_primitives::Address::random();
// Create updates that will exceed STATE_UPDATE_MAX_BATCH_TARGETS (64)
// Each update: 1 account + slots = targets
// First update: 1 + 30 = 31 targets
// Second update: 1 + 30 = 31 targets (total: 62, still under 64)
// Third update: 1 + 30 = 31 targets (total would be 93, exceeds 64)
let source_tx = StateChangeSource::Transaction(1);
let source_preblock =
StateChangeSource::PreBlock(StateChangePreBlockSource::BeaconRootContract);
let source_tx2 = StateChangeSource::Transaction(2);
let tx = task.state_root_message_sender();
tx.send(MultiProofMessage::StateUpdate(source_tx, create_large_state_update(addr1, 30)))
.unwrap();
tx.send(MultiProofMessage::StateUpdate(
source_preblock,
create_large_state_update(addr2, 30),
))
.unwrap();
tx.send(MultiProofMessage::StateUpdate(source_tx2, create_large_state_update(addr3, 30)))
.unwrap();
// First receive should get first update
if let Ok(MultiProofMessage::StateUpdate(_, first_update)) = task.rx.recv() {
let mut merged_update = first_update;
let mut num_batched = 1;
let mut accumulated_targets = estimate_evm_state_targets(&merged_update);
let mut pending_update: Option<EvmState> = None;
// Batch while under limit
while accumulated_targets < STATE_UPDATE_MAX_BATCH_TARGETS {
match task.rx.try_recv() {
Ok(MultiProofMessage::StateUpdate(_, next_update)) => {
let next_targets = estimate_evm_state_targets(&next_update);
if accumulated_targets + next_targets > STATE_UPDATE_MAX_BATCH_TARGETS {
// Would exceed limit - save as pending and stop batching
pending_update = Some(next_update);
break;
}
accumulated_targets += next_targets;
merged_update.extend(next_update);
num_batched += 1;
}
_ => break,
}
}
// Should have batched first two updates (62 targets) but not the third
assert_eq!(num_batched, 2, "Should batch updates up to target limit");
assert!(merged_update.contains_key(&addr1));
assert!(merged_update.contains_key(&addr2));
assert!(!merged_update.contains_key(&addr3), "Third update should not be batched");
// Third update should be pending (not batched due to target limit)
let pending = pending_update.expect("Third update should be pending");
assert!(pending.contains_key(&addr3), "Third update should contain addr3");
} else {
panic!("Expected StateUpdate message");
}
}

View File

@@ -31,6 +31,7 @@ nybbles = { workspace = true, features = ["rlp"] }
# reth
revm-database.workspace = true
revm-state.workspace = true
# `serde` feature
serde = { workspace = true, optional = true }
@@ -64,7 +65,6 @@ bincode.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true
revm-state.workspace = true
[features]
default = ["std"]

View File

@@ -83,6 +83,19 @@ impl MultiAddedRemovedKeys {
self.storages.entry(address).or_insert_with(default_added_removed_keys);
}
}
/// Marks an account as removed.
pub fn mark_account_removed(&mut self, account: B256) {
self.account.insert_removed(account);
}
/// Marks a storage slot as removed for the given account.
pub fn mark_storage_removed(&mut self, hashed_address: B256, slot: B256) {
self.storages
.entry(hashed_address)
.or_insert_with(default_added_removed_keys)
.insert_removed(slot);
}
}
#[cfg(test)]
@@ -184,6 +197,147 @@ mod tests {
assert!(!multi_keys.get_accounts().is_removed(&addr));
}
#[test]
fn test_record_removals_is_monotonic() {
use alloy_primitives::Address;
use revm_state::{Account, AccountInfo, AccountStatus, EvmStorageSlot};
let mut multi_keys = MultiAddedRemovedKeys::new();
let address = Address::random();
let slot = U256::from(42);
let hashed_addr = keccak256(address);
let hashed_slot = keccak256(B256::from(slot));
// Update 1: Create slot with value 100
let mut update1 = EvmState::default();
update1.insert(
address,
Account {
info: AccountInfo::default(),
transaction_id: 0,
storage: std::iter::once((
slot,
EvmStorageSlot::new_changed(U256::ZERO, U256::from(100), 0),
))
.collect(),
status: AccountStatus::Touched,
},
);
multi_keys.record_removals(&update1);
// Slot should NOT be marked as removed (value is 100, not 0)
assert!(
multi_keys.get_storage(&hashed_addr).is_none() ||
!multi_keys.get_storage(&hashed_addr).unwrap().is_removed(&hashed_slot)
);
// Update 2: Delete slot (set to 0)
let mut update2 = EvmState::default();
update2.insert(
address,
Account {
info: AccountInfo::default(),
transaction_id: 1,
storage: std::iter::once((
slot,
EvmStorageSlot::new_changed(U256::from(100), U256::ZERO, 1),
))
.collect(),
status: AccountStatus::Touched,
},
);
multi_keys.record_removals(&update2);
// Slot should be marked as removed
assert!(multi_keys.get_storage(&hashed_addr).unwrap().is_removed(&hashed_slot));
// Update 3: Recreate slot with value 200
let mut update3 = EvmState::default();
update3.insert(
address,
Account {
info: AccountInfo::default(),
transaction_id: 2,
storage: std::iter::once((
slot,
EvmStorageSlot::new_changed(U256::ZERO, U256::from(200), 2),
))
.collect(),
status: AccountStatus::Touched,
},
);
multi_keys.record_removals(&update3);
// KEY ASSERTION: Still removed after recreation!
// This is the critical difference from update_with_state.
// Removals are monotonic - once removed, stays removed for proof invalidation.
assert!(
multi_keys.get_storage(&hashed_addr).unwrap().is_removed(&hashed_slot),
"slot should remain marked as removed even after recreation"
);
}
#[test]
fn test_record_removals_selfdestruct() {
use alloy_primitives::Address;
use revm_state::{Account, AccountInfo, AccountStatus};
let mut multi_keys = MultiAddedRemovedKeys::new();
let address = Address::random();
let hashed_addr = keccak256(address);
// Selfdestruct the account (must also be Touched to be processed)
let mut update = EvmState::default();
update.insert(
address,
Account {
info: AccountInfo::default(),
transaction_id: 0,
storage: Default::default(),
status: AccountStatus::SelfDestructed | AccountStatus::Touched,
},
);
multi_keys.record_removals(&update);
// Account should be marked as removed
assert!(multi_keys.get_accounts().is_removed(&hashed_addr));
}
#[test]
fn test_record_removals_ignores_untouched() {
use alloy_primitives::Address;
use revm_state::{Account, AccountInfo, AccountStatus, EvmStorageSlot};
let mut multi_keys = MultiAddedRemovedKeys::new();
let address = Address::random();
let slot = U256::from(1);
let hashed_addr = keccak256(address);
let hashed_slot = keccak256(B256::from(slot));
// Create an untouched account with zero storage
let mut update = EvmState::default();
update.insert(
address,
Account {
info: AccountInfo::default(),
transaction_id: 0,
storage: std::iter::once((
slot,
EvmStorageSlot::new_changed(U256::from(100), U256::ZERO, 0),
))
.collect(),
status: AccountStatus::default(), // NOT touched
},
);
multi_keys.record_removals(&update);
// Should NOT be marked as removed because account wasn't touched
assert!(
multi_keys.get_storage(&hashed_addr).is_none() ||
!multi_keys.get_storage(&hashed_addr).unwrap().is_removed(&hashed_slot)
);
}
#[test]
fn test_update_with_state_account_with_balance() {
let mut multi_keys = MultiAddedRemovedKeys::new();