From d3b25bba860c7eb401e8c999a973ca606a153f34 Mon Sep 17 00:00:00 2001 From: yongkangc Date: Fri, 19 Dec 2025 03:18:57 +0000 Subject: [PATCH] chore: clean up comments for review --- .../src/tree/payload_processor/multiproof.rs | 75 +++++-------------- 1 file changed, 20 insertions(+), 55 deletions(-) diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs index d29e9be9d9..bc84cd8fb6 100644 --- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -67,9 +67,6 @@ const PREFETCH_MAX_BATCH_MESSAGES: usize = 16; /// partitioning) before dispatch. const STATE_UPDATE_MAX_BATCH_TARGETS: usize = 64; -/// Preallocation hint for state update batching to avoid repeated reallocations on small bursts. -const STATE_UPDATE_BATCH_PREALLOC: usize = 16; - /// The default max targets, for limiting the number of account and storage proof targets to be /// fetched by a single worker. If exceeded, chunking is forced regardless of worker availability. const DEFAULT_MAX_TARGETS_FOR_CHUNKING: usize = 300; @@ -222,7 +219,6 @@ pub(crate) fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostStat if destroyed { hashed_state.storages.insert(hashed_address, HashedStorage::new(true)); } else { - // Collect changed storage slots directly to avoid Peekable allocation let changed_storage: Vec<_> = account .storage .into_iter() @@ -1051,13 +1047,15 @@ impl MultiProofTask { debug!(target: "engine::tree::payload_processor::multiproof", "Started state root calculation"); } - let mut accumulated_count = targets.chunking_length(); - ctx.accumulated_prefetch_targets.clear(); - ctx.accumulated_prefetch_targets.push(targets); + // Initialize batch with first targets - merge in-place as messages arrive + let mut merged_targets = targets; + let mut accumulated_count = merged_targets.chunking_length(); + let mut num_batched = 1usize; // Batch consecutive prefetch messages up to limits. + // Merge directly instead of accumulating to Vec to reduce allocations. while accumulated_count < PREFETCH_MAX_BATCH_TARGETS && - ctx.accumulated_prefetch_targets.len() < PREFETCH_MAX_BATCH_MESSAGES + num_batched < PREFETCH_MAX_BATCH_MESSAGES { match self.rx.try_recv() { Ok(MultiProofMessage::PrefetchProofs(next_targets)) => { @@ -1068,7 +1066,9 @@ impl MultiProofTask { break; } accumulated_count += next_count; - ctx.accumulated_prefetch_targets.push(next_targets); + // Merge in-place: next_targets freed immediately + merged_targets.extend(next_targets); + num_batched += 1; } Ok(other_msg) => { ctx.pending_msg = Some(other_msg); @@ -1078,19 +1078,8 @@ impl MultiProofTask { } } - // Process all accumulated messages in a single batch - let num_batched = ctx.accumulated_prefetch_targets.len(); self.metrics.prefetch_batch_size_histogram.record(num_batched as f64); - // Merge all accumulated prefetch targets into a single dispatch payload. - // Use drain to preserve the buffer allocation. - let mut accumulated_iter = ctx.accumulated_prefetch_targets.drain(..); - let mut merged_targets = - accumulated_iter.next().expect("prefetch batch always has at least one entry"); - for next_targets in accumulated_iter { - merged_targets.extend(next_targets); - } - let account_targets = merged_targets.len(); let storage_targets = merged_targets.values().map(|slots| slots.len()).sum::(); @@ -1106,7 +1095,6 @@ impl MultiProofTask { false } - // State update: batch consecutive updates, then hash once MultiProofMessage::StateUpdate(source, update) => { trace!(target: "engine::tree::payload_processor::multiproof", "processing MultiProofMessage::StateUpdate"); @@ -1118,12 +1106,14 @@ impl MultiProofTask { debug!(target: "engine::tree::payload_processor::multiproof", "Started state root calculation"); } - // Accumulate messages including the first one; reuse buffer to avoid allocations. - let mut accumulated_targets = estimate_evm_state_targets(&update); - ctx.accumulated_state_updates.clear(); - ctx.accumulated_state_updates.push((source, update)); + // Initialize batch with first update - merge in-place as messages arrive + let mut batch_source = source; + let mut merged_update = update; + let mut accumulated_targets = estimate_evm_state_targets(&merged_update); + let mut num_batched = 1usize; // Batch consecutive state update messages up to target limit. + // Merge directly instead of accumulating to Vec to reduce allocations. while accumulated_targets < STATE_UPDATE_MAX_BATCH_TARGETS { match self.rx.try_recv() { Ok(MultiProofMessage::StateUpdate(next_source, next_update)) => { @@ -1136,7 +1126,10 @@ impl MultiProofTask { break; } accumulated_targets += next_estimate; - ctx.accumulated_state_updates.push((next_source, next_update)); + batch_source = next_source; + // Merge in-place: preserves original_value, overlay freed immediately + merge_evm_state(&mut merged_update, next_update); + num_batched += 1; } Ok(other_msg) => { ctx.pending_msg = Some(other_msg); @@ -1146,22 +1139,8 @@ impl MultiProofTask { } } - // Process all accumulated messages in a single batch - let num_batched = ctx.accumulated_state_updates.len(); self.metrics.state_update_batch_size_histogram.record(num_batched as f64); - // Merge all accumulated EvmState updates. - // Use merge_evm_state instead of extend to preserve original_value, - // ensuring is_changed() returns correct results after batching. - let mut accumulated_iter = ctx.accumulated_state_updates.drain(..); - let (mut batch_source, mut merged_update) = accumulated_iter - .next() - .expect("state update batch always has at least one entry"); - for (next_source, next_update) in accumulated_iter { - batch_source = next_source; - merge_evm_state(&mut merged_update, next_update); - } - // Convert to HashedPostState after merging. let batch_len = merged_update.len(); let hashed_state = evm_state_to_hashed_post_state(merged_update); @@ -1460,26 +1439,12 @@ struct MultiproofBatchCtx { /// Timestamp when state updates finished. `Some` indicates all state updates have been /// received. updates_finished_time: Option, - /// Reusable buffer for accumulating prefetch targets during batching. - accumulated_prefetch_targets: Vec, - /// Reusable buffer for accumulating state updates during batching. - /// - /// Uses `EvmState` with `merge_evm_state` which preserves `original_value` from base state, - /// ensuring `is_changed()` returns correct results after batching. - accumulated_state_updates: Vec<(Source, EvmState)>, } impl MultiproofBatchCtx { /// Creates a new batch context with the given start time. fn new(start: Instant) -> Self { - Self { - pending_msg: None, - first_update_time: None, - start, - updates_finished_time: None, - accumulated_prefetch_targets: Vec::with_capacity(PREFETCH_MAX_BATCH_MESSAGES), - accumulated_state_updates: Vec::with_capacity(STATE_UPDATE_BATCH_PREALLOC), - } + Self { pending_msg: None, first_update_time: None, start, updates_finished_time: None } } /// Returns `true` if all state updates have been received.