mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-10 07:48:19 -05:00
chore(engine): remove state update batching in multiproof (#20842)
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -21,7 +21,7 @@ use reth_trie_parallel::{
|
||||
AccountMultiproofInput, ProofResultContext, ProofResultMessage, ProofWorkerHandle,
|
||||
},
|
||||
};
|
||||
use std::{collections::BTreeMap, mem, ops::DerefMut, sync::Arc, time::Instant};
|
||||
use std::{collections::BTreeMap, ops::DerefMut, sync::Arc, time::Instant};
|
||||
use tracing::{debug, error, instrument, trace};
|
||||
|
||||
/// Source of state changes, either from EVM execution or from a Block Access List.
|
||||
@@ -57,14 +57,6 @@ const PREFETCH_MAX_BATCH_TARGETS: usize = 512;
|
||||
/// Prevents excessive batching even with small messages.
|
||||
const PREFETCH_MAX_BATCH_MESSAGES: usize = 16;
|
||||
|
||||
/// Maximum number of targets to batch together for state updates.
|
||||
/// Lower than prefetch because state updates require additional processing (hashing, state
|
||||
/// partitioning) before dispatch.
|
||||
const STATE_UPDATE_MAX_BATCH_TARGETS: usize = 64;
|
||||
|
||||
/// Preallocation hint for state update batching to avoid repeated reallocations on small bursts.
|
||||
const STATE_UPDATE_BATCH_PREALLOC: usize = 16;
|
||||
|
||||
/// The default max targets, for limiting the number of account and storage proof targets to be
|
||||
/// fetched by a single worker. If exceeded, chunking is forced regardless of worker availability.
|
||||
const DEFAULT_MAX_TARGETS_FOR_CHUNKING: usize = 300;
|
||||
@@ -440,8 +432,6 @@ pub(crate) struct MultiProofTaskMetrics {
|
||||
|
||||
/// Histogram of prefetch proof batch sizes (number of messages merged).
|
||||
pub prefetch_batch_size_histogram: Histogram,
|
||||
/// Histogram of state update batch sizes (number of messages merged).
|
||||
pub state_update_batch_size_histogram: Histogram,
|
||||
|
||||
/// Histogram of proof calculation durations.
|
||||
pub proof_calculation_duration_histogram: Histogram,
|
||||
@@ -862,11 +852,11 @@ impl MultiProofTask {
|
||||
.filter(|proof| !proof.is_empty())
|
||||
}
|
||||
|
||||
/// Processes a multiproof message, batching consecutive same-type messages.
|
||||
/// Processes a multiproof message, batching consecutive prefetch messages.
|
||||
///
|
||||
/// Drains queued messages of the same type and merges them into one batch before processing,
|
||||
/// storing one pending message (different type or over-cap) to handle on the next iteration.
|
||||
/// This preserves ordering without requeuing onto the channel.
|
||||
/// For prefetch messages, drains queued prefetch messages and merges them into one batch before
|
||||
/// processing, storing one pending message (different type or over-cap) to handle on the next
|
||||
/// iteration. State updates are processed directly without batching.
|
||||
///
|
||||
/// Returns `true` if done, `false` to continue.
|
||||
fn process_multiproof_message<P>(
|
||||
@@ -947,7 +937,6 @@ impl MultiProofTask {
|
||||
|
||||
false
|
||||
}
|
||||
// State update: batch consecutive updates from the same source
|
||||
MultiProofMessage::StateUpdate(source, update) => {
|
||||
trace!(target: "engine::tree::payload_processor::multiproof", "processing MultiProofMessage::StateUpdate");
|
||||
|
||||
@@ -959,80 +948,14 @@ impl MultiProofTask {
|
||||
debug!(target: "engine::tree::payload_processor::multiproof", "Started state root calculation");
|
||||
}
|
||||
|
||||
// Accumulate messages including the first one; reuse buffer to avoid allocations.
|
||||
let mut accumulated_targets = estimate_evm_state_targets(&update);
|
||||
ctx.accumulated_state_updates.clear();
|
||||
ctx.accumulated_state_updates.push((source, update));
|
||||
|
||||
// Batch consecutive state update messages up to target limit.
|
||||
while accumulated_targets < STATE_UPDATE_MAX_BATCH_TARGETS {
|
||||
match self.rx.try_recv() {
|
||||
Ok(MultiProofMessage::StateUpdate(next_source, next_update)) => {
|
||||
let (batch_source, batch_update) = &ctx.accumulated_state_updates[0];
|
||||
if !can_batch_state_update(
|
||||
*batch_source,
|
||||
batch_update,
|
||||
next_source,
|
||||
&next_update,
|
||||
) {
|
||||
ctx.pending_msg =
|
||||
Some(MultiProofMessage::StateUpdate(next_source, next_update));
|
||||
break;
|
||||
}
|
||||
|
||||
let next_estimate = estimate_evm_state_targets(&next_update);
|
||||
// Would exceed batch cap; leave pending to dispatch on next iteration.
|
||||
if accumulated_targets + next_estimate > STATE_UPDATE_MAX_BATCH_TARGETS
|
||||
{
|
||||
ctx.pending_msg =
|
||||
Some(MultiProofMessage::StateUpdate(next_source, next_update));
|
||||
break;
|
||||
}
|
||||
accumulated_targets += next_estimate;
|
||||
ctx.accumulated_state_updates.push((next_source, next_update));
|
||||
}
|
||||
Ok(other_msg) => {
|
||||
ctx.pending_msg = Some(other_msg);
|
||||
break;
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
|
||||
// Process all accumulated messages in a single batch
|
||||
let num_batched = ctx.accumulated_state_updates.len();
|
||||
self.metrics.state_update_batch_size_histogram.record(num_batched as f64);
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
{
|
||||
let batch_source = ctx.accumulated_state_updates[0].0;
|
||||
let batch_update = &ctx.accumulated_state_updates[0].1;
|
||||
debug_assert!(ctx.accumulated_state_updates.iter().all(|(source, update)| {
|
||||
can_batch_state_update(batch_source, batch_update, *source, update)
|
||||
}));
|
||||
}
|
||||
|
||||
// Merge all accumulated updates into a single EvmState payload.
|
||||
// Use drain to preserve the buffer allocation.
|
||||
let mut accumulated_iter = ctx.accumulated_state_updates.drain(..);
|
||||
let (mut batch_source, mut merged_update) = accumulated_iter
|
||||
.next()
|
||||
.expect("state update batch always has at least one entry");
|
||||
for (next_source, next_update) in accumulated_iter {
|
||||
batch_source = next_source;
|
||||
merged_update.extend(next_update);
|
||||
}
|
||||
|
||||
let batch_len = merged_update.len();
|
||||
batch_metrics.state_update_proofs_requested +=
|
||||
self.on_state_update(batch_source, merged_update);
|
||||
let update_len = update.len();
|
||||
batch_metrics.state_update_proofs_requested += self.on_state_update(source, update);
|
||||
trace!(
|
||||
target: "engine::tree::payload_processor::multiproof",
|
||||
?batch_source,
|
||||
len = batch_len,
|
||||
?source,
|
||||
len = update_len,
|
||||
state_update_proofs_requested = ?batch_metrics.state_update_proofs_requested,
|
||||
num_batched,
|
||||
"Dispatched state update batch"
|
||||
"Dispatched state update"
|
||||
);
|
||||
|
||||
false
|
||||
@@ -1168,10 +1091,9 @@ impl MultiProofTask {
|
||||
/// * Once this message is received, on every [`MultiProofMessage::EmptyProof`] and
|
||||
/// [`ProofResultMessage`], we check if all proofs have been processed and if there are any
|
||||
/// pending proofs in the proof sequencer left to be revealed.
|
||||
/// 6. While running, consecutive [`MultiProofMessage::PrefetchProofs`] and
|
||||
/// [`MultiProofMessage::StateUpdate`] messages are batched to reduce redundant work; if a
|
||||
/// different message type arrives mid-batch or a batch cap is reached, it is held as
|
||||
/// `pending_msg` and processed on the next loop to preserve ordering.
|
||||
/// 6. While running, consecutive [`MultiProofMessage::PrefetchProofs`] messages are batched to
|
||||
/// reduce redundant work; if a different message type arrives mid-batch or a batch cap is
|
||||
/// reached, it is held as `pending_msg` and processed on the next loop to preserve ordering.
|
||||
/// 7. This task exits after all pending proofs are processed.
|
||||
#[instrument(
|
||||
level = "debug",
|
||||
@@ -1304,7 +1226,7 @@ impl MultiProofTask {
|
||||
///
|
||||
/// Contains processing state that persists across loop iterations.
|
||||
///
|
||||
/// Used by `process_multiproof_message` to batch consecutive same-type messages received via
|
||||
/// Used by `process_multiproof_message` to batch consecutive prefetch messages received via
|
||||
/// `try_recv` for efficient processing.
|
||||
struct MultiproofBatchCtx {
|
||||
/// Buffers a non-matching message type encountered during batching.
|
||||
@@ -1320,8 +1242,6 @@ struct MultiproofBatchCtx {
|
||||
updates_finished_time: Option<Instant>,
|
||||
/// Reusable buffer for accumulating prefetch targets during batching.
|
||||
accumulated_prefetch_targets: Vec<MultiProofTargets>,
|
||||
/// Reusable buffer for accumulating state updates during batching.
|
||||
accumulated_state_updates: Vec<(Source, EvmState)>,
|
||||
}
|
||||
|
||||
impl MultiproofBatchCtx {
|
||||
@@ -1333,7 +1253,6 @@ impl MultiproofBatchCtx {
|
||||
start,
|
||||
updates_finished_time: None,
|
||||
accumulated_prefetch_targets: Vec::with_capacity(PREFETCH_MAX_BATCH_MESSAGES),
|
||||
accumulated_state_updates: Vec::with_capacity(STATE_UPDATE_BATCH_PREALLOC),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1433,66 +1352,6 @@ where
|
||||
1
|
||||
}
|
||||
|
||||
/// Checks whether two state updates can be merged in a batch.
|
||||
///
|
||||
/// Transaction updates with the same transaction ID (`StateChangeSource::Transaction(id)`)
|
||||
/// are safe to merge because they originate from the same logical execution and can be
|
||||
/// coalesced to amortize proof work.
|
||||
fn can_batch_state_update(
|
||||
batch_source: Source,
|
||||
batch_update: &EvmState,
|
||||
next_source: Source,
|
||||
next_update: &EvmState,
|
||||
) -> bool {
|
||||
if !same_source(batch_source, next_source) {
|
||||
return false;
|
||||
}
|
||||
|
||||
match (batch_source, next_source) {
|
||||
(
|
||||
Source::Evm(StateChangeSource::PreBlock(_)),
|
||||
Source::Evm(StateChangeSource::PreBlock(_)),
|
||||
) |
|
||||
(
|
||||
Source::Evm(StateChangeSource::PostBlock(_)),
|
||||
Source::Evm(StateChangeSource::PostBlock(_)),
|
||||
) => batch_update == next_update,
|
||||
_ => true,
|
||||
}
|
||||
}
|
||||
|
||||
/// Checks whether two sources refer to the same origin.
|
||||
fn same_source(lhs: Source, rhs: Source) -> bool {
|
||||
match (lhs, rhs) {
|
||||
(
|
||||
Source::Evm(StateChangeSource::Transaction(a)),
|
||||
Source::Evm(StateChangeSource::Transaction(b)),
|
||||
) => a == b,
|
||||
(
|
||||
Source::Evm(StateChangeSource::PreBlock(a)),
|
||||
Source::Evm(StateChangeSource::PreBlock(b)),
|
||||
) => mem::discriminant(&a) == mem::discriminant(&b),
|
||||
(
|
||||
Source::Evm(StateChangeSource::PostBlock(a)),
|
||||
Source::Evm(StateChangeSource::PostBlock(b)),
|
||||
) => mem::discriminant(&a) == mem::discriminant(&b),
|
||||
(Source::BlockAccessList, Source::BlockAccessList) => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Estimates target count from `EvmState` for batching decisions.
|
||||
fn estimate_evm_state_targets(state: &EvmState) -> usize {
|
||||
state
|
||||
.values()
|
||||
.filter(|account| account.is_touched())
|
||||
.map(|account| {
|
||||
let changed_slots = account.storage.iter().filter(|(_, v)| v.is_changed()).count();
|
||||
1 + changed_slots
|
||||
})
|
||||
.sum()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -2040,320 +1899,6 @@ mod tests {
|
||||
assert_eq!(proofs_requested, 1);
|
||||
}
|
||||
|
||||
/// Verifies that consecutive state update messages from the same source are batched together.
|
||||
#[test]
|
||||
fn test_state_update_batching() {
|
||||
use alloy_evm::block::StateChangeSource;
|
||||
use revm_state::Account;
|
||||
|
||||
let test_provider_factory = create_test_provider_factory();
|
||||
let mut task = create_test_state_root_task(test_provider_factory);
|
||||
|
||||
// create multiple state updates
|
||||
let addr1 = alloy_primitives::Address::random();
|
||||
let addr2 = alloy_primitives::Address::random();
|
||||
|
||||
let mut update1 = EvmState::default();
|
||||
update1.insert(
|
||||
addr1,
|
||||
Account {
|
||||
info: revm_state::AccountInfo {
|
||||
balance: U256::from(100),
|
||||
nonce: 1,
|
||||
code_hash: Default::default(),
|
||||
code: Default::default(),
|
||||
},
|
||||
transaction_id: Default::default(),
|
||||
storage: Default::default(),
|
||||
status: revm_state::AccountStatus::Touched,
|
||||
},
|
||||
);
|
||||
|
||||
let mut update2 = EvmState::default();
|
||||
update2.insert(
|
||||
addr2,
|
||||
Account {
|
||||
info: revm_state::AccountInfo {
|
||||
balance: U256::from(200),
|
||||
nonce: 2,
|
||||
code_hash: Default::default(),
|
||||
code: Default::default(),
|
||||
},
|
||||
transaction_id: Default::default(),
|
||||
storage: Default::default(),
|
||||
status: revm_state::AccountStatus::Touched,
|
||||
},
|
||||
);
|
||||
|
||||
let source = StateChangeSource::Transaction(0);
|
||||
|
||||
let tx = task.tx.clone();
|
||||
tx.send(MultiProofMessage::StateUpdate(source.into(), update1.clone())).unwrap();
|
||||
tx.send(MultiProofMessage::StateUpdate(source.into(), update2.clone())).unwrap();
|
||||
|
||||
let proofs_requested =
|
||||
if let Ok(MultiProofMessage::StateUpdate(_src, update)) = task.rx.recv() {
|
||||
let mut merged_update = update;
|
||||
let mut num_batched = 1;
|
||||
|
||||
while let Ok(MultiProofMessage::StateUpdate(_next_source, next_update)) =
|
||||
task.rx.try_recv()
|
||||
{
|
||||
merged_update.extend(next_update);
|
||||
num_batched += 1;
|
||||
}
|
||||
|
||||
assert_eq!(num_batched, 2);
|
||||
assert_eq!(merged_update.len(), 2);
|
||||
assert!(merged_update.contains_key(&addr1));
|
||||
assert!(merged_update.contains_key(&addr2));
|
||||
|
||||
task.on_state_update(source.into(), merged_update)
|
||||
} else {
|
||||
panic!("Expected StateUpdate message");
|
||||
};
|
||||
assert_eq!(proofs_requested, 1);
|
||||
}
|
||||
|
||||
/// Verifies that state updates from different sources are not batched together.
|
||||
#[test]
|
||||
fn test_state_update_batching_separates_sources() {
|
||||
use alloy_evm::block::StateChangeSource;
|
||||
use revm_state::Account;
|
||||
|
||||
let test_provider_factory = create_test_provider_factory();
|
||||
let task = create_test_state_root_task(test_provider_factory);
|
||||
|
||||
let addr_a1 = alloy_primitives::Address::random();
|
||||
let addr_b1 = alloy_primitives::Address::random();
|
||||
let addr_a2 = alloy_primitives::Address::random();
|
||||
|
||||
let create_state_update = |addr: alloy_primitives::Address, balance: u64| {
|
||||
let mut state = EvmState::default();
|
||||
state.insert(
|
||||
addr,
|
||||
Account {
|
||||
info: revm_state::AccountInfo {
|
||||
balance: U256::from(balance),
|
||||
nonce: 1,
|
||||
code_hash: Default::default(),
|
||||
code: Default::default(),
|
||||
},
|
||||
transaction_id: Default::default(),
|
||||
storage: Default::default(),
|
||||
status: revm_state::AccountStatus::Touched,
|
||||
},
|
||||
);
|
||||
state
|
||||
};
|
||||
|
||||
let source_a = StateChangeSource::Transaction(1);
|
||||
let source_b = StateChangeSource::Transaction(2);
|
||||
|
||||
// Queue: A1 (immediate dispatch), B1 (batched), A2 (should become pending)
|
||||
let tx = task.tx.clone();
|
||||
tx.send(MultiProofMessage::StateUpdate(source_a.into(), create_state_update(addr_a1, 100)))
|
||||
.unwrap();
|
||||
tx.send(MultiProofMessage::StateUpdate(source_b.into(), create_state_update(addr_b1, 200)))
|
||||
.unwrap();
|
||||
tx.send(MultiProofMessage::StateUpdate(source_a.into(), create_state_update(addr_a2, 300)))
|
||||
.unwrap();
|
||||
|
||||
let mut pending_msg: Option<MultiProofMessage> = None;
|
||||
|
||||
if let Ok(MultiProofMessage::StateUpdate(first_source, _)) = task.rx.recv() {
|
||||
assert!(same_source(first_source, source_a.into()));
|
||||
|
||||
// Simulate batching loop for remaining messages
|
||||
let mut accumulated_updates: Vec<(Source, EvmState)> = Vec::new();
|
||||
let mut accumulated_targets = 0usize;
|
||||
|
||||
loop {
|
||||
if accumulated_targets >= STATE_UPDATE_MAX_BATCH_TARGETS {
|
||||
break;
|
||||
}
|
||||
match task.rx.try_recv() {
|
||||
Ok(MultiProofMessage::StateUpdate(next_source, next_update)) => {
|
||||
if let Some((batch_source, batch_update)) = accumulated_updates.first() &&
|
||||
!can_batch_state_update(
|
||||
*batch_source,
|
||||
batch_update,
|
||||
next_source,
|
||||
&next_update,
|
||||
)
|
||||
{
|
||||
pending_msg =
|
||||
Some(MultiProofMessage::StateUpdate(next_source, next_update));
|
||||
break;
|
||||
}
|
||||
|
||||
let next_estimate = estimate_evm_state_targets(&next_update);
|
||||
if next_estimate > STATE_UPDATE_MAX_BATCH_TARGETS {
|
||||
pending_msg =
|
||||
Some(MultiProofMessage::StateUpdate(next_source, next_update));
|
||||
break;
|
||||
}
|
||||
if accumulated_targets + next_estimate > STATE_UPDATE_MAX_BATCH_TARGETS &&
|
||||
!accumulated_updates.is_empty()
|
||||
{
|
||||
pending_msg =
|
||||
Some(MultiProofMessage::StateUpdate(next_source, next_update));
|
||||
break;
|
||||
}
|
||||
accumulated_targets += next_estimate;
|
||||
accumulated_updates.push((next_source, next_update));
|
||||
}
|
||||
Ok(other_msg) => {
|
||||
pending_msg = Some(other_msg);
|
||||
break;
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(accumulated_updates.len(), 1, "Should only batch matching sources");
|
||||
let batch_source = accumulated_updates[0].0;
|
||||
assert!(same_source(batch_source, source_b.into()));
|
||||
|
||||
let batch_source = accumulated_updates[0].0;
|
||||
let mut merged_update = accumulated_updates.remove(0).1;
|
||||
for (_, next_update) in accumulated_updates {
|
||||
merged_update.extend(next_update);
|
||||
}
|
||||
|
||||
assert!(same_source(batch_source, source_b.into()), "Batch should use matching source");
|
||||
assert!(merged_update.contains_key(&addr_b1));
|
||||
assert!(!merged_update.contains_key(&addr_a1));
|
||||
assert!(!merged_update.contains_key(&addr_a2));
|
||||
} else {
|
||||
panic!("Expected first StateUpdate");
|
||||
}
|
||||
|
||||
match pending_msg {
|
||||
Some(MultiProofMessage::StateUpdate(pending_source, pending_update)) => {
|
||||
assert!(same_source(pending_source, source_a.into()));
|
||||
assert!(pending_update.contains_key(&addr_a2));
|
||||
}
|
||||
other => panic!("Expected pending StateUpdate with source_a, got {:?}", other),
|
||||
}
|
||||
}
|
||||
|
||||
/// Verifies that pre-block updates only batch when their payloads are identical.
|
||||
#[test]
|
||||
fn test_pre_block_updates_require_payload_match_to_batch() {
|
||||
use alloy_evm::block::{StateChangePreBlockSource, StateChangeSource};
|
||||
use revm_state::Account;
|
||||
|
||||
let test_provider_factory = create_test_provider_factory();
|
||||
let task = create_test_state_root_task(test_provider_factory);
|
||||
|
||||
let addr1 = alloy_primitives::Address::random();
|
||||
let addr2 = alloy_primitives::Address::random();
|
||||
let addr3 = alloy_primitives::Address::random();
|
||||
|
||||
let create_state_update = |addr: alloy_primitives::Address, balance: u64| {
|
||||
let mut state = EvmState::default();
|
||||
state.insert(
|
||||
addr,
|
||||
Account {
|
||||
info: revm_state::AccountInfo {
|
||||
balance: U256::from(balance),
|
||||
nonce: 1,
|
||||
code_hash: Default::default(),
|
||||
code: Default::default(),
|
||||
},
|
||||
transaction_id: Default::default(),
|
||||
storage: Default::default(),
|
||||
status: revm_state::AccountStatus::Touched,
|
||||
},
|
||||
);
|
||||
state
|
||||
};
|
||||
|
||||
let source = StateChangeSource::PreBlock(StateChangePreBlockSource::BeaconRootContract);
|
||||
|
||||
// Queue: first update dispatched immediately, next two should not merge
|
||||
let tx = task.tx.clone();
|
||||
tx.send(MultiProofMessage::StateUpdate(source.into(), create_state_update(addr1, 100)))
|
||||
.unwrap();
|
||||
tx.send(MultiProofMessage::StateUpdate(source.into(), create_state_update(addr2, 200)))
|
||||
.unwrap();
|
||||
tx.send(MultiProofMessage::StateUpdate(source.into(), create_state_update(addr3, 300)))
|
||||
.unwrap();
|
||||
|
||||
let mut pending_msg: Option<MultiProofMessage> = None;
|
||||
|
||||
if let Ok(MultiProofMessage::StateUpdate(first_source, first_update)) = task.rx.recv() {
|
||||
assert!(same_source(first_source, source.into()));
|
||||
assert!(first_update.contains_key(&addr1));
|
||||
|
||||
let mut accumulated_updates: Vec<(Source, EvmState)> = Vec::new();
|
||||
let mut accumulated_targets = 0usize;
|
||||
|
||||
loop {
|
||||
if accumulated_targets >= STATE_UPDATE_MAX_BATCH_TARGETS {
|
||||
break;
|
||||
}
|
||||
match task.rx.try_recv() {
|
||||
Ok(MultiProofMessage::StateUpdate(next_source, next_update)) => {
|
||||
if let Some((batch_source, batch_update)) = accumulated_updates.first() &&
|
||||
!can_batch_state_update(
|
||||
*batch_source,
|
||||
batch_update,
|
||||
next_source,
|
||||
&next_update,
|
||||
)
|
||||
{
|
||||
pending_msg =
|
||||
Some(MultiProofMessage::StateUpdate(next_source, next_update));
|
||||
break;
|
||||
}
|
||||
|
||||
let next_estimate = estimate_evm_state_targets(&next_update);
|
||||
if next_estimate > STATE_UPDATE_MAX_BATCH_TARGETS {
|
||||
pending_msg =
|
||||
Some(MultiProofMessage::StateUpdate(next_source, next_update));
|
||||
break;
|
||||
}
|
||||
if accumulated_targets + next_estimate > STATE_UPDATE_MAX_BATCH_TARGETS &&
|
||||
!accumulated_updates.is_empty()
|
||||
{
|
||||
pending_msg =
|
||||
Some(MultiProofMessage::StateUpdate(next_source, next_update));
|
||||
break;
|
||||
}
|
||||
accumulated_targets += next_estimate;
|
||||
accumulated_updates.push((next_source, next_update));
|
||||
}
|
||||
Ok(other_msg) => {
|
||||
pending_msg = Some(other_msg);
|
||||
break;
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(
|
||||
accumulated_updates.len(),
|
||||
1,
|
||||
"Second pre-block update should not merge with a different payload"
|
||||
);
|
||||
let (batched_source, batched_update) = accumulated_updates.remove(0);
|
||||
assert!(same_source(batched_source, source.into()));
|
||||
assert!(batched_update.contains_key(&addr2));
|
||||
assert!(!batched_update.contains_key(&addr3));
|
||||
|
||||
match pending_msg {
|
||||
Some(MultiProofMessage::StateUpdate(_, pending_update)) => {
|
||||
assert!(pending_update.contains_key(&addr3));
|
||||
}
|
||||
other => panic!("Expected pending third pre-block update, got {:?}", other),
|
||||
}
|
||||
} else {
|
||||
panic!("Expected first StateUpdate");
|
||||
}
|
||||
}
|
||||
|
||||
/// Verifies that different message types arriving mid-batch are not lost and preserve order.
|
||||
#[test]
|
||||
fn test_batching_preserves_ordering_with_different_message_type() {
|
||||
@@ -2538,6 +2083,7 @@ mod tests {
|
||||
assert!(matches!(pending, MultiProofMessage::StateUpdate(_, _)));
|
||||
|
||||
// Pending message should be handled before the next select loop.
|
||||
// StateUpdate is processed directly without batching.
|
||||
assert!(!task.process_multiproof_message(
|
||||
pending,
|
||||
&mut ctx,
|
||||
@@ -2545,161 +2091,16 @@ mod tests {
|
||||
&test_provider
|
||||
));
|
||||
|
||||
// Prefetch2 should now be in pending_msg (captured by StateUpdate's batching loop).
|
||||
match ctx.pending_msg.take() {
|
||||
Some(MultiProofMessage::PrefetchProofs(targets)) => {
|
||||
// Since StateUpdate doesn't batch, Prefetch2 remains in the channel (not in pending_msg).
|
||||
assert!(ctx.pending_msg.is_none());
|
||||
|
||||
// Prefetch2 should still be in the channel.
|
||||
match task.rx.try_recv() {
|
||||
Ok(MultiProofMessage::PrefetchProofs(targets)) => {
|
||||
assert_eq!(targets.len(), 1);
|
||||
assert!(targets.contains_key(&prefetch_addr2));
|
||||
}
|
||||
other => panic!("Expected remaining PrefetchProofs2 in pending_msg, got {:?}", other),
|
||||
}
|
||||
}
|
||||
|
||||
/// Verifies that pending messages from a previous batch drain get full batching treatment.
|
||||
#[test]
|
||||
fn test_pending_messages_get_full_batching_treatment() {
|
||||
// Queue: [Prefetch1, State1, State2, State3, Prefetch2]
|
||||
//
|
||||
// Expected behavior:
|
||||
// 1. recv() → Prefetch1
|
||||
// 2. try_recv() → State1 is different type → pending = State1, break
|
||||
// 3. Process Prefetch1
|
||||
// 4. Next iteration: pending = State1 → process with batching
|
||||
// 5. try_recv() → State2 same type → merge
|
||||
// 6. try_recv() → State3 same type → merge
|
||||
// 7. try_recv() → Prefetch2 different type → pending = Prefetch2, break
|
||||
// 8. Process merged State (1+2+3)
|
||||
//
|
||||
// Without the state-machine fix, State1 would be processed alone (no batching).
|
||||
use alloy_evm::block::StateChangeSource;
|
||||
use revm_state::Account;
|
||||
|
||||
let test_provider_factory = create_test_provider_factory();
|
||||
let task = create_test_state_root_task(test_provider_factory);
|
||||
|
||||
let prefetch_addr1 = B256::random();
|
||||
let prefetch_addr2 = B256::random();
|
||||
let state_addr1 = alloy_primitives::Address::random();
|
||||
let state_addr2 = alloy_primitives::Address::random();
|
||||
let state_addr3 = alloy_primitives::Address::random();
|
||||
|
||||
// Create Prefetch targets
|
||||
let mut prefetch1 = MultiProofTargets::default();
|
||||
prefetch1.insert(prefetch_addr1, HashSet::default());
|
||||
|
||||
let mut prefetch2 = MultiProofTargets::default();
|
||||
prefetch2.insert(prefetch_addr2, HashSet::default());
|
||||
|
||||
// Create StateUpdates
|
||||
let create_state_update = |addr: alloy_primitives::Address, balance: u64| {
|
||||
let mut state = EvmState::default();
|
||||
state.insert(
|
||||
addr,
|
||||
Account {
|
||||
info: revm_state::AccountInfo {
|
||||
balance: U256::from(balance),
|
||||
nonce: 1,
|
||||
code_hash: Default::default(),
|
||||
code: Default::default(),
|
||||
},
|
||||
transaction_id: Default::default(),
|
||||
storage: Default::default(),
|
||||
status: revm_state::AccountStatus::Touched,
|
||||
},
|
||||
);
|
||||
state
|
||||
};
|
||||
|
||||
let source = StateChangeSource::Transaction(42);
|
||||
|
||||
// Queue: [Prefetch1, State1, State2, State3, Prefetch2]
|
||||
let tx = task.tx.clone();
|
||||
tx.send(MultiProofMessage::PrefetchProofs(prefetch1.clone())).unwrap();
|
||||
tx.send(MultiProofMessage::StateUpdate(
|
||||
source.into(),
|
||||
create_state_update(state_addr1, 100),
|
||||
))
|
||||
.unwrap();
|
||||
tx.send(MultiProofMessage::StateUpdate(
|
||||
source.into(),
|
||||
create_state_update(state_addr2, 200),
|
||||
))
|
||||
.unwrap();
|
||||
tx.send(MultiProofMessage::StateUpdate(
|
||||
source.into(),
|
||||
create_state_update(state_addr3, 300),
|
||||
))
|
||||
.unwrap();
|
||||
tx.send(MultiProofMessage::PrefetchProofs(prefetch2.clone())).unwrap();
|
||||
|
||||
// Simulate the state-machine loop behavior
|
||||
let mut pending_msg: Option<MultiProofMessage> = None;
|
||||
|
||||
// First iteration: recv() gets Prefetch1, drains until State1
|
||||
if let Ok(MultiProofMessage::PrefetchProofs(targets)) = task.rx.recv() {
|
||||
let mut merged_targets = targets;
|
||||
loop {
|
||||
match task.rx.try_recv() {
|
||||
Ok(MultiProofMessage::PrefetchProofs(next_targets)) => {
|
||||
merged_targets.extend(next_targets);
|
||||
}
|
||||
Ok(other_msg) => {
|
||||
pending_msg = Some(other_msg);
|
||||
break;
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
// Should have only Prefetch1 (State1 is different type)
|
||||
assert_eq!(merged_targets.len(), 1);
|
||||
assert!(merged_targets.contains_key(&prefetch_addr1));
|
||||
} else {
|
||||
panic!("Expected PrefetchProofs");
|
||||
}
|
||||
|
||||
// Pending should be State1
|
||||
assert!(matches!(pending_msg, Some(MultiProofMessage::StateUpdate(_, _))));
|
||||
|
||||
// Second iteration: process pending State1 WITH BATCHING
|
||||
// This is the key test - the pending message should drain State2 and State3
|
||||
if let Some(MultiProofMessage::StateUpdate(_src, first_update)) = pending_msg.take() {
|
||||
let mut merged_update = first_update;
|
||||
let mut num_batched = 1;
|
||||
|
||||
loop {
|
||||
match task.rx.try_recv() {
|
||||
Ok(MultiProofMessage::StateUpdate(_src, next_update)) => {
|
||||
merged_update.extend(next_update);
|
||||
num_batched += 1;
|
||||
}
|
||||
Ok(other_msg) => {
|
||||
pending_msg = Some(other_msg);
|
||||
break;
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
|
||||
// THE KEY ASSERTION: pending State1 should have batched with State2 and State3
|
||||
assert_eq!(
|
||||
num_batched, 3,
|
||||
"Pending message should get full batching treatment and merge all 3 StateUpdates"
|
||||
);
|
||||
assert_eq!(merged_update.len(), 3, "Should have all 3 addresses in merged update");
|
||||
assert!(merged_update.contains_key(&state_addr1));
|
||||
assert!(merged_update.contains_key(&state_addr2));
|
||||
assert!(merged_update.contains_key(&state_addr3));
|
||||
} else {
|
||||
panic!("Expected pending StateUpdate");
|
||||
}
|
||||
|
||||
// Pending should now be Prefetch2
|
||||
match pending_msg {
|
||||
Some(MultiProofMessage::PrefetchProofs(targets)) => {
|
||||
assert_eq!(targets.len(), 1);
|
||||
assert!(targets.contains_key(&prefetch_addr2));
|
||||
}
|
||||
_ => panic!("Prefetch2 was lost!"),
|
||||
other => panic!("Expected PrefetchProofs2 in channel, got {:?}", other),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user