mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-08 23:08:19 -05:00
chore: clean up comments for review
This commit is contained in:
@@ -67,9 +67,6 @@ const PREFETCH_MAX_BATCH_MESSAGES: usize = 16;
|
||||
/// partitioning) before dispatch.
|
||||
const STATE_UPDATE_MAX_BATCH_TARGETS: usize = 64;
|
||||
|
||||
/// Preallocation hint for state update batching to avoid repeated reallocations on small bursts.
|
||||
const STATE_UPDATE_BATCH_PREALLOC: usize = 16;
|
||||
|
||||
/// The default max targets, for limiting the number of account and storage proof targets to be
|
||||
/// fetched by a single worker. If exceeded, chunking is forced regardless of worker availability.
|
||||
const DEFAULT_MAX_TARGETS_FOR_CHUNKING: usize = 300;
|
||||
@@ -222,7 +219,6 @@ pub(crate) fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostStat
|
||||
if destroyed {
|
||||
hashed_state.storages.insert(hashed_address, HashedStorage::new(true));
|
||||
} else {
|
||||
// Collect changed storage slots directly to avoid Peekable allocation
|
||||
let changed_storage: Vec<_> = account
|
||||
.storage
|
||||
.into_iter()
|
||||
@@ -1051,13 +1047,15 @@ impl MultiProofTask {
|
||||
debug!(target: "engine::tree::payload_processor::multiproof", "Started state root calculation");
|
||||
}
|
||||
|
||||
let mut accumulated_count = targets.chunking_length();
|
||||
ctx.accumulated_prefetch_targets.clear();
|
||||
ctx.accumulated_prefetch_targets.push(targets);
|
||||
// Initialize batch with first targets - merge in-place as messages arrive
|
||||
let mut merged_targets = targets;
|
||||
let mut accumulated_count = merged_targets.chunking_length();
|
||||
let mut num_batched = 1usize;
|
||||
|
||||
// Batch consecutive prefetch messages up to limits.
|
||||
// Merge directly instead of accumulating to Vec to reduce allocations.
|
||||
while accumulated_count < PREFETCH_MAX_BATCH_TARGETS &&
|
||||
ctx.accumulated_prefetch_targets.len() < PREFETCH_MAX_BATCH_MESSAGES
|
||||
num_batched < PREFETCH_MAX_BATCH_MESSAGES
|
||||
{
|
||||
match self.rx.try_recv() {
|
||||
Ok(MultiProofMessage::PrefetchProofs(next_targets)) => {
|
||||
@@ -1068,7 +1066,9 @@ impl MultiProofTask {
|
||||
break;
|
||||
}
|
||||
accumulated_count += next_count;
|
||||
ctx.accumulated_prefetch_targets.push(next_targets);
|
||||
// Merge in-place: next_targets freed immediately
|
||||
merged_targets.extend(next_targets);
|
||||
num_batched += 1;
|
||||
}
|
||||
Ok(other_msg) => {
|
||||
ctx.pending_msg = Some(other_msg);
|
||||
@@ -1078,19 +1078,8 @@ impl MultiProofTask {
|
||||
}
|
||||
}
|
||||
|
||||
// Process all accumulated messages in a single batch
|
||||
let num_batched = ctx.accumulated_prefetch_targets.len();
|
||||
self.metrics.prefetch_batch_size_histogram.record(num_batched as f64);
|
||||
|
||||
// Merge all accumulated prefetch targets into a single dispatch payload.
|
||||
// Use drain to preserve the buffer allocation.
|
||||
let mut accumulated_iter = ctx.accumulated_prefetch_targets.drain(..);
|
||||
let mut merged_targets =
|
||||
accumulated_iter.next().expect("prefetch batch always has at least one entry");
|
||||
for next_targets in accumulated_iter {
|
||||
merged_targets.extend(next_targets);
|
||||
}
|
||||
|
||||
let account_targets = merged_targets.len();
|
||||
let storage_targets =
|
||||
merged_targets.values().map(|slots| slots.len()).sum::<usize>();
|
||||
@@ -1106,7 +1095,6 @@ impl MultiProofTask {
|
||||
|
||||
false
|
||||
}
|
||||
// State update: batch consecutive updates, then hash once
|
||||
MultiProofMessage::StateUpdate(source, update) => {
|
||||
trace!(target: "engine::tree::payload_processor::multiproof", "processing MultiProofMessage::StateUpdate");
|
||||
|
||||
@@ -1118,12 +1106,14 @@ impl MultiProofTask {
|
||||
debug!(target: "engine::tree::payload_processor::multiproof", "Started state root calculation");
|
||||
}
|
||||
|
||||
// Accumulate messages including the first one; reuse buffer to avoid allocations.
|
||||
let mut accumulated_targets = estimate_evm_state_targets(&update);
|
||||
ctx.accumulated_state_updates.clear();
|
||||
ctx.accumulated_state_updates.push((source, update));
|
||||
// Initialize batch with first update - merge in-place as messages arrive
|
||||
let mut batch_source = source;
|
||||
let mut merged_update = update;
|
||||
let mut accumulated_targets = estimate_evm_state_targets(&merged_update);
|
||||
let mut num_batched = 1usize;
|
||||
|
||||
// Batch consecutive state update messages up to target limit.
|
||||
// Merge directly instead of accumulating to Vec to reduce allocations.
|
||||
while accumulated_targets < STATE_UPDATE_MAX_BATCH_TARGETS {
|
||||
match self.rx.try_recv() {
|
||||
Ok(MultiProofMessage::StateUpdate(next_source, next_update)) => {
|
||||
@@ -1136,7 +1126,10 @@ impl MultiProofTask {
|
||||
break;
|
||||
}
|
||||
accumulated_targets += next_estimate;
|
||||
ctx.accumulated_state_updates.push((next_source, next_update));
|
||||
batch_source = next_source;
|
||||
// Merge in-place: preserves original_value, overlay freed immediately
|
||||
merge_evm_state(&mut merged_update, next_update);
|
||||
num_batched += 1;
|
||||
}
|
||||
Ok(other_msg) => {
|
||||
ctx.pending_msg = Some(other_msg);
|
||||
@@ -1146,22 +1139,8 @@ impl MultiProofTask {
|
||||
}
|
||||
}
|
||||
|
||||
// Process all accumulated messages in a single batch
|
||||
let num_batched = ctx.accumulated_state_updates.len();
|
||||
self.metrics.state_update_batch_size_histogram.record(num_batched as f64);
|
||||
|
||||
// Merge all accumulated EvmState updates.
|
||||
// Use merge_evm_state instead of extend to preserve original_value,
|
||||
// ensuring is_changed() returns correct results after batching.
|
||||
let mut accumulated_iter = ctx.accumulated_state_updates.drain(..);
|
||||
let (mut batch_source, mut merged_update) = accumulated_iter
|
||||
.next()
|
||||
.expect("state update batch always has at least one entry");
|
||||
for (next_source, next_update) in accumulated_iter {
|
||||
batch_source = next_source;
|
||||
merge_evm_state(&mut merged_update, next_update);
|
||||
}
|
||||
|
||||
// Convert to HashedPostState after merging.
|
||||
let batch_len = merged_update.len();
|
||||
let hashed_state = evm_state_to_hashed_post_state(merged_update);
|
||||
@@ -1460,26 +1439,12 @@ struct MultiproofBatchCtx {
|
||||
/// Timestamp when state updates finished. `Some` indicates all state updates have been
|
||||
/// received.
|
||||
updates_finished_time: Option<Instant>,
|
||||
/// Reusable buffer for accumulating prefetch targets during batching.
|
||||
accumulated_prefetch_targets: Vec<MultiProofTargets>,
|
||||
/// Reusable buffer for accumulating state updates during batching.
|
||||
///
|
||||
/// Uses `EvmState` with `merge_evm_state` which preserves `original_value` from base state,
|
||||
/// ensuring `is_changed()` returns correct results after batching.
|
||||
accumulated_state_updates: Vec<(Source, EvmState)>,
|
||||
}
|
||||
|
||||
impl MultiproofBatchCtx {
|
||||
/// Creates a new batch context with the given start time.
|
||||
fn new(start: Instant) -> Self {
|
||||
Self {
|
||||
pending_msg: None,
|
||||
first_update_time: None,
|
||||
start,
|
||||
updates_finished_time: None,
|
||||
accumulated_prefetch_targets: Vec::with_capacity(PREFETCH_MAX_BATCH_MESSAGES),
|
||||
accumulated_state_updates: Vec::with_capacity(STATE_UPDATE_BATCH_PREALLOC),
|
||||
}
|
||||
Self { pending_msg: None, first_update_time: None, start, updates_finished_time: None }
|
||||
}
|
||||
|
||||
/// Returns `true` if all state updates have been received.
|
||||
|
||||
Reference in New Issue
Block a user