From 9b8d5f2e32bf29557dfea72abb34b94ff5a56614 Mon Sep 17 00:00:00 2001 From: Alex Radchenko Date: Wed, 12 Nov 2025 12:20:30 +0100 Subject: [PATCH] perf(engine, trie): chunk based on idle workers --- .../src/tree/payload_processor/chunking.rs | 80 ++++++++++++++++++ .../tree/src/tree/payload_processor/mod.rs | 1 + .../src/tree/payload_processor/multiproof.rs | 47 ++++++----- crates/trie/common/src/hashed_state.rs | 84 ++++++++++++++++++- crates/trie/common/src/proofs.rs | 46 +++++++++- 5 files changed, 234 insertions(+), 24 deletions(-) create mode 100644 crates/engine/tree/src/tree/payload_processor/chunking.rs diff --git a/crates/engine/tree/src/tree/payload_processor/chunking.rs b/crates/engine/tree/src/tree/payload_processor/chunking.rs new file mode 100644 index 0000000000..cc63b281b9 --- /dev/null +++ b/crates/engine/tree/src/tree/payload_processor/chunking.rs @@ -0,0 +1,80 @@ +//! Generic chunk planning utilities for multiproof scheduling in the engine. + +/// A computed plan for splitting `total` items into `chunks` parts. +/// +/// Sizes are distributed as evenly as possible: +/// - `base` items per chunk +/// - the first `increased` chunks get one extra item (size `base + 1`) +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(super) struct ChunkPlan { + /// Number of chunks to produce. + pub chunks: usize, + /// Base size for each chunk. + pub base: usize, + /// Number of leading chunks that receive an extra item. + pub increased: usize, +} + +impl ChunkPlan { + /// Returns the size for the `index`-th chunk (0-based). + #[cfg(test)] + pub(super) fn chunk_size(&self, index: usize) -> usize { + self.base + usize::from(index < self.increased) + } + + /// Returns an iterator of chunk sizes with length `self.chunks`. + #[cfg(test)] + pub(super) fn sizes(&self) -> impl Iterator + '_ { + (0..self.chunks).map(|i| self.chunk_size(i)) + } +} + +/// Computes a chunk plan given the total amount of items, the number of idle workers +/// available to process chunks concurrently, and a minimum chunk size. +pub(super) fn compute_chunk_plan(total: usize, idle: usize, min_chunk: usize) -> Option { + if idle == 0 || total == 0 { + return None; + } + + let max_chunks_amount = total / min_chunk; + let chunks = max_chunks_amount.min(idle); + + (chunks >= 2).then(|| { + let base = total / chunks; + let increased = total % chunks; + ChunkPlan { chunks, base, increased } + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn no_idle_no_plan() { + assert_eq!(compute_chunk_plan(10, 0, 1), None); + assert_eq!(compute_chunk_plan(0, 4, 1), None); + } + + #[test] + fn chunks_respect_min_chunk_and_idle() { + // total=10, idle=3, min_chunk=4 -> max_chunks=2 -> chunks=2 -> base=5, increased=0 + let plan = compute_chunk_plan(10, 3, 4).unwrap(); + assert_eq!(plan, ChunkPlan { chunks: 2, base: 5, increased: 0 }); + assert_eq!(plan.sizes().collect::>(), vec![5, 5]); + } + + #[test] + fn remainder_distributed_to_leading_chunks() { + // total=10, idle=4, min_chunk=3 -> max_chunks=3 -> chunks=3 -> base=3, increased=1 + let plan = compute_chunk_plan(10, 4, 3).unwrap(); + assert_eq!(plan, ChunkPlan { chunks: 3, base: 3, increased: 1 }); + assert_eq!(plan.sizes().collect::>(), vec![4, 3, 3]); + } + + #[test] + fn no_chunk_if_only_one_possible() { + // total=5, idle=8, min_chunk=3 -> max_chunks=1 -> chunks=1 -> None + assert_eq!(compute_chunk_plan(5, 8, 3), None); + } +} diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index 98ba7e6f55..ad134d7f0e 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -51,6 +51,7 @@ use std::{ }; use tracing::{debug, debug_span, instrument, warn, Span}; +mod chunking; mod configured_sparse_trie; pub mod executor; pub mod multiproof; diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs index 85614646d7..38766441ab 100644 --- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -1,5 +1,6 @@ //! Multiproof task related functionality. +use super::chunking::compute_chunk_plan; use alloy_evm::block::StateChangeSource; use alloy_primitives::{ keccak256, @@ -679,9 +680,10 @@ pub(crate) struct MultiProofTaskMetrics { /// See the `run()` method documentation for detailed lifecycle flow. #[derive(Debug)] pub(super) struct MultiProofTask { - /// The size of proof targets chunk to spawn in one calculation. - /// If None, chunking is disabled and all targets are processed in a single proof. - chunk_size: Option, + /// Minimum chunk size when splitting work across idle workers. + /// Chunking is only applied if at least 2 chunks can be formed and + /// each chunk would be at least this size. If `None`, chunking is disabled. + min_chunk_size: Option, /// Receiver for state root related messages (prefetch, state updates, finish signal). rx: CrossbeamReceiver, /// Sender for state root related messages. @@ -712,14 +714,14 @@ impl MultiProofTask { pub(super) fn new( proof_worker_handle: ProofWorkerHandle, to_sparse_trie: std::sync::mpsc::Sender, - chunk_size: Option, + min_chunk_size: Option, ) -> Self { let (tx, rx) = unbounded(); let (proof_result_tx, proof_result_rx) = unbounded(); let metrics = MultiProofTaskMetrics::default(); Self { - chunk_size, + min_chunk_size, rx, tx, proof_result_rx, @@ -776,11 +778,11 @@ impl MultiProofTask { let num_chunks = dispatch_with_chunking( proof_targets, chunking_len, - self.chunk_size, + self.min_chunk_size, self.max_targets_for_chunking, available_account_workers, available_storage_workers, - MultiProofTargets::chunks, + |targets, base, increased| targets.chunks_with(base, increased), |proof_targets| { self.multiproof_manager.dispatch( MultiproofInput { @@ -917,11 +919,11 @@ impl MultiProofTask { let num_chunks = dispatch_with_chunking( not_fetched_state_update, chunking_len, - self.chunk_size, + self.min_chunk_size, self.max_targets_for_chunking, available_account_workers, available_storage_workers, - HashedPostState::chunks, + |state, base, increased| state.chunks_with(base, increased), |hashed_state_update| { let proof_targets = get_proof_targets( &hashed_state_update, @@ -1452,15 +1454,19 @@ fn get_proof_targets( /// Dispatches work items as a single unit or in chunks based on target size and worker /// availability. +/// +/// Uses `compute_chunk_plan` to determine optimal chunk distribution based on available +/// idle workers and minimum chunk size. This ensures work is distributed evenly across +/// workers without creating too many small chunks. #[allow(clippy::too_many_arguments)] fn dispatch_with_chunking( items: T, chunking_len: usize, - chunk_size: Option, + min_chunk_size: Option, max_targets_for_chunking: usize, available_account_workers: usize, available_storage_workers: usize, - chunker: impl FnOnce(T, usize) -> I, + chunker: impl FnOnce(T, usize, usize) -> I, mut dispatch: impl FnMut(T), ) -> usize where @@ -1470,16 +1476,17 @@ where available_account_workers > 1 || available_storage_workers > 1; - if should_chunk && - let Some(chunk_size) = chunk_size && - chunking_len > chunk_size - { - let mut num_chunks = 0usize; - for chunk in chunker(items, chunk_size) { - dispatch(chunk); - num_chunks += 1; + if should_chunk && let Some(min_chunk) = min_chunk_size { + // Use idle account workers to determine optimal chunk distribution + let idle_workers = available_account_workers; + if let Some(plan) = compute_chunk_plan(chunking_len, idle_workers, min_chunk) { + let mut num_chunks = 0usize; + for chunk in chunker(items, plan.base, plan.increased) { + dispatch(chunk); + num_chunks += 1; + } + return num_chunks; } - return num_chunks; } dispatch(items); diff --git a/crates/trie/common/src/hashed_state.rs b/crates/trie/common/src/hashed_state.rs index edfb821bc6..7c4eb8b2bb 100644 --- a/crates/trie/common/src/hashed_state.rs +++ b/crates/trie/common/src/hashed_state.rs @@ -279,6 +279,12 @@ impl HashedPostState { ChunkedHashedPostState::new(self, size) } + /// Returns an iterator that yields chunks where the first `increased` chunks have size + /// `size + 1` and the remaining have size `size`. + pub fn chunks_with(self, size: usize, increased: usize) -> ChunkedHashedPostState { + ChunkedHashedPostState::new_with_increased(self, size, increased) + } + /// Returns the number of items that will be considered during chunking in `[Self::chunks]`. pub fn chunking_length(&self) -> usize { self.accounts.len() + @@ -692,6 +698,7 @@ impl From for HashedPostState { pub struct ChunkedHashedPostState { flattened: alloc::vec::IntoIter<(B256, FlattenedHashedPostStateItem)>, size: usize, + increased: usize, } #[derive(Debug)] @@ -731,7 +738,17 @@ impl ChunkedHashedPostState { // 3. Account update .sorted_by_key(|(address, _)| *address); - Self { flattened, size } + Self { flattened, size, increased: 0 } + } + + fn new_with_increased( + hashed_post_state: HashedPostState, + size: usize, + increased: usize, + ) -> Self { + let mut this = Self::new(hashed_post_state, size); + this.increased = increased; + this } } @@ -741,8 +758,10 @@ impl Iterator for ChunkedHashedPostState { fn next(&mut self) -> Option { let mut chunk = HashedPostState::default(); + let extra = usize::from(self.increased > 0); + let mut current_size = 0; - while current_size < self.size { + while current_size < (self.size + extra) { let Some((address, item)) = self.flattened.next() else { break }; match item { @@ -763,6 +782,9 @@ impl Iterator for ChunkedHashedPostState { if chunk.is_empty() { None } else { + if extra == 1 { + self.increased -= 1; + } Some(chunk) } } @@ -776,6 +798,27 @@ mod tests { use revm_database::{states::StorageSlot, StorageWithOriginalValues}; use revm_state::{AccountInfo, Bytecode}; + // Local copy for testing since chunking utilities are defined in the engine crate now. + #[derive(Clone, Copy, Debug, PartialEq, Eq)] + struct ChunkPlan { + chunks: usize, + base: usize, + increased: usize, + } + + fn compute_chunk_plan(total: usize, idle: usize, min_chunk: usize) -> Option { + if idle == 0 || total == 0 { + return None; + } + let max_chunks_amount = total / min_chunk; + let chunks = max_chunks_amount.min(idle); + (chunks >= 2).then(|| { + let base = total / chunks; + let increased = total % chunks; + ChunkPlan { chunks, base, increased } + }) + } + #[test] fn hashed_state_wiped_extension() { let hashed_address = B256::default(); @@ -1529,4 +1572,41 @@ mod tests { assert_eq!(storage.storage.len(), 3); assert!(storage.wiped); } + + #[test] + fn chunking_distribution_matches_plan() { + // Build a hashed post state with: + // addr1: wiped + 2 storage updates -> 3 items + // addr2: 1 storage update -> 1 item + // addr3: account update (None) -> 1 item + // total = 5 items + let addr1 = B256::with_last_byte(0xA1); + let addr2 = B256::with_last_byte(0xA2); + let addr3 = B256::with_last_byte(0xA3); + + let mut state = HashedPostState::default(); + let mut st1 = HashedStorage::new(true); + st1.storage.insert(B256::with_last_byte(0x01), U256::from(1)); + st1.storage.insert(B256::with_last_byte(0x02), U256::from(2)); + state.storages.insert(addr1, st1); + + let mut st2 = HashedStorage::new(false); + st2.storage.insert(B256::with_last_byte(0x03), U256::from(3)); + state.storages.insert(addr2, st2); + + state.accounts.insert(addr3, None); + + assert_eq!(state.chunking_length(), 5); + + // idle=2, min=2 -> chunks=2 -> base=2, increased=1 -> sizes [3,2] + let plan = compute_chunk_plan(5, 2, 2).unwrap(); + assert_eq!(plan, ChunkPlan { chunks: 2, base: 2, increased: 1 }); + + let sizes = state + .chunks_with(plan.base, plan.increased) + .map(|chunk| chunk.chunking_length()) + .collect::>(); + assert_eq!(sizes, vec![3, 2]); + assert_eq!(sizes.len(), plan.chunks); + } } diff --git a/crates/trie/common/src/proofs.rs b/crates/trie/common/src/proofs.rs index a8e0bb59b9..94194adf72 100644 --- a/crates/trie/common/src/proofs.rs +++ b/crates/trie/common/src/proofs.rs @@ -90,6 +90,12 @@ impl MultiProofTargets { ChunkedMultiProofTargets::new(self, size) } + /// Returns an iterator that yields chunks where the first `increased` chunks have size + /// `size + 1` and the remaining have size `size`. + pub fn chunks_with(self, size: usize, increased: usize) -> ChunkedMultiProofTargets { + ChunkedMultiProofTargets::new_with_increased(self, size, increased) + } + /// Returns the number of items that will be considered during chunking in `[Self::chunks]`. pub fn chunking_length(&self) -> usize { self.values().map(|slots| 1 + slots.len().saturating_sub(1)).sum::() @@ -120,6 +126,7 @@ impl MultiProofTargets { pub struct ChunkedMultiProofTargets { flattened_targets: alloc::vec::IntoIter<(B256, Option)>, size: usize, + increased: usize, } impl ChunkedMultiProofTargets { @@ -139,7 +146,13 @@ impl ChunkedMultiProofTargets { } }) .sorted(); - Self { flattened_targets, size } + Self { flattened_targets, size, increased: 0 } + } + + fn new_with_increased(targets: MultiProofTargets, size: usize, increased: usize) -> Self { + let mut this = Self::new(targets, size); + this.increased = increased; + this } } @@ -147,7 +160,9 @@ impl Iterator for ChunkedMultiProofTargets { type Item = MultiProofTargets; fn next(&mut self) -> Option { - let chunk = self.flattened_targets.by_ref().take(self.size).fold( + let extra = usize::from(self.increased > 0); + + let chunk = self.flattened_targets.by_ref().take(self.size + extra).fold( MultiProofTargets::default(), |mut acc, (address, slot)| { let entry = acc.entry(address).or_default(); @@ -161,6 +176,9 @@ impl Iterator for ChunkedMultiProofTargets { if chunk.is_empty() { None } else { + if extra == 1 { + self.increased -= 1; + } Some(chunk) } } @@ -1101,4 +1119,28 @@ mod tests { ); } } + + #[test] + fn no_workers_no_chunk_plan() { + fn compute_chunk_plan( + total: usize, + idle: usize, + min_chunk: usize, + ) -> Option<(usize, usize, usize)> { + if idle == 0 || total == 0 { + return None; + } + let max_chunks_amount = total / min_chunk; + let chunks = max_chunks_amount.min(idle); + (chunks >= 2).then(|| { + let base = total / chunks; + let increased = total % chunks; + (chunks, base, increased) + }) + } + assert!(compute_chunk_plan(10, 0, 3).is_none()); + assert!(compute_chunk_plan(0, 2, 1).is_none()); + // With workers but too small total for 2 chunks -> max_chunks=1 -> None + assert!(compute_chunk_plan(3, 4, 3).is_none()); + } }