perf(engine, trie): chunk based on idle workers

This commit is contained in:
Alex Radchenko
2025-11-12 12:20:30 +01:00
committed by yongkangc
parent ed3a8a03d5
commit 9b8d5f2e32
5 changed files with 234 additions and 24 deletions

View File

@@ -0,0 +1,80 @@
//! Generic chunk planning utilities for multiproof scheduling in the engine.
/// A computed plan for splitting `total` items into `chunks` parts.
///
/// Sizes are distributed as evenly as possible:
/// - `base` items per chunk
/// - the first `increased` chunks get one extra item (size `base + 1`)
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(super) struct ChunkPlan {
/// Number of chunks to produce.
pub chunks: usize,
/// Base size for each chunk.
pub base: usize,
/// Number of leading chunks that receive an extra item.
pub increased: usize,
}
impl ChunkPlan {
/// Returns the size for the `index`-th chunk (0-based).
#[cfg(test)]
pub(super) fn chunk_size(&self, index: usize) -> usize {
self.base + usize::from(index < self.increased)
}
/// Returns an iterator of chunk sizes with length `self.chunks`.
#[cfg(test)]
pub(super) fn sizes(&self) -> impl Iterator<Item = usize> + '_ {
(0..self.chunks).map(|i| self.chunk_size(i))
}
}
/// Computes a chunk plan given the total amount of items, the number of idle workers
/// available to process chunks concurrently, and a minimum chunk size.
pub(super) fn compute_chunk_plan(total: usize, idle: usize, min_chunk: usize) -> Option<ChunkPlan> {
if idle == 0 || total == 0 {
return None;
}
let max_chunks_amount = total / min_chunk;
let chunks = max_chunks_amount.min(idle);
(chunks >= 2).then(|| {
let base = total / chunks;
let increased = total % chunks;
ChunkPlan { chunks, base, increased }
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn no_idle_no_plan() {
assert_eq!(compute_chunk_plan(10, 0, 1), None);
assert_eq!(compute_chunk_plan(0, 4, 1), None);
}
#[test]
fn chunks_respect_min_chunk_and_idle() {
// total=10, idle=3, min_chunk=4 -> max_chunks=2 -> chunks=2 -> base=5, increased=0
let plan = compute_chunk_plan(10, 3, 4).unwrap();
assert_eq!(plan, ChunkPlan { chunks: 2, base: 5, increased: 0 });
assert_eq!(plan.sizes().collect::<Vec<_>>(), vec![5, 5]);
}
#[test]
fn remainder_distributed_to_leading_chunks() {
// total=10, idle=4, min_chunk=3 -> max_chunks=3 -> chunks=3 -> base=3, increased=1
let plan = compute_chunk_plan(10, 4, 3).unwrap();
assert_eq!(plan, ChunkPlan { chunks: 3, base: 3, increased: 1 });
assert_eq!(plan.sizes().collect::<Vec<_>>(), vec![4, 3, 3]);
}
#[test]
fn no_chunk_if_only_one_possible() {
// total=5, idle=8, min_chunk=3 -> max_chunks=1 -> chunks=1 -> None
assert_eq!(compute_chunk_plan(5, 8, 3), None);
}
}

View File

@@ -51,6 +51,7 @@ use std::{
};
use tracing::{debug, debug_span, instrument, warn, Span};
mod chunking;
mod configured_sparse_trie;
pub mod executor;
pub mod multiproof;

View File

@@ -1,5 +1,6 @@
//! Multiproof task related functionality.
use super::chunking::compute_chunk_plan;
use alloy_evm::block::StateChangeSource;
use alloy_primitives::{
keccak256,
@@ -679,9 +680,10 @@ pub(crate) struct MultiProofTaskMetrics {
/// See the `run()` method documentation for detailed lifecycle flow.
#[derive(Debug)]
pub(super) struct MultiProofTask {
/// The size of proof targets chunk to spawn in one calculation.
/// If None, chunking is disabled and all targets are processed in a single proof.
chunk_size: Option<usize>,
/// Minimum chunk size when splitting work across idle workers.
/// Chunking is only applied if at least 2 chunks can be formed and
/// each chunk would be at least this size. If `None`, chunking is disabled.
min_chunk_size: Option<usize>,
/// Receiver for state root related messages (prefetch, state updates, finish signal).
rx: CrossbeamReceiver<MultiProofMessage>,
/// Sender for state root related messages.
@@ -712,14 +714,14 @@ impl MultiProofTask {
pub(super) fn new(
proof_worker_handle: ProofWorkerHandle,
to_sparse_trie: std::sync::mpsc::Sender<SparseTrieUpdate>,
chunk_size: Option<usize>,
min_chunk_size: Option<usize>,
) -> Self {
let (tx, rx) = unbounded();
let (proof_result_tx, proof_result_rx) = unbounded();
let metrics = MultiProofTaskMetrics::default();
Self {
chunk_size,
min_chunk_size,
rx,
tx,
proof_result_rx,
@@ -776,11 +778,11 @@ impl MultiProofTask {
let num_chunks = dispatch_with_chunking(
proof_targets,
chunking_len,
self.chunk_size,
self.min_chunk_size,
self.max_targets_for_chunking,
available_account_workers,
available_storage_workers,
MultiProofTargets::chunks,
|targets, base, increased| targets.chunks_with(base, increased),
|proof_targets| {
self.multiproof_manager.dispatch(
MultiproofInput {
@@ -917,11 +919,11 @@ impl MultiProofTask {
let num_chunks = dispatch_with_chunking(
not_fetched_state_update,
chunking_len,
self.chunk_size,
self.min_chunk_size,
self.max_targets_for_chunking,
available_account_workers,
available_storage_workers,
HashedPostState::chunks,
|state, base, increased| state.chunks_with(base, increased),
|hashed_state_update| {
let proof_targets = get_proof_targets(
&hashed_state_update,
@@ -1452,15 +1454,19 @@ fn get_proof_targets(
/// Dispatches work items as a single unit or in chunks based on target size and worker
/// availability.
///
/// Uses `compute_chunk_plan` to determine optimal chunk distribution based on available
/// idle workers and minimum chunk size. This ensures work is distributed evenly across
/// workers without creating too many small chunks.
#[allow(clippy::too_many_arguments)]
fn dispatch_with_chunking<T, I>(
items: T,
chunking_len: usize,
chunk_size: Option<usize>,
min_chunk_size: Option<usize>,
max_targets_for_chunking: usize,
available_account_workers: usize,
available_storage_workers: usize,
chunker: impl FnOnce(T, usize) -> I,
chunker: impl FnOnce(T, usize, usize) -> I,
mut dispatch: impl FnMut(T),
) -> usize
where
@@ -1470,16 +1476,17 @@ where
available_account_workers > 1 ||
available_storage_workers > 1;
if should_chunk &&
let Some(chunk_size) = chunk_size &&
chunking_len > chunk_size
{
let mut num_chunks = 0usize;
for chunk in chunker(items, chunk_size) {
dispatch(chunk);
num_chunks += 1;
if should_chunk && let Some(min_chunk) = min_chunk_size {
// Use idle account workers to determine optimal chunk distribution
let idle_workers = available_account_workers;
if let Some(plan) = compute_chunk_plan(chunking_len, idle_workers, min_chunk) {
let mut num_chunks = 0usize;
for chunk in chunker(items, plan.base, plan.increased) {
dispatch(chunk);
num_chunks += 1;
}
return num_chunks;
}
return num_chunks;
}
dispatch(items);

View File

@@ -279,6 +279,12 @@ impl HashedPostState {
ChunkedHashedPostState::new(self, size)
}
/// Returns an iterator that yields chunks where the first `increased` chunks have size
/// `size + 1` and the remaining have size `size`.
pub fn chunks_with(self, size: usize, increased: usize) -> ChunkedHashedPostState {
ChunkedHashedPostState::new_with_increased(self, size, increased)
}
/// Returns the number of items that will be considered during chunking in `[Self::chunks]`.
pub fn chunking_length(&self) -> usize {
self.accounts.len() +
@@ -692,6 +698,7 @@ impl From<HashedPostStateSorted> for HashedPostState {
pub struct ChunkedHashedPostState {
flattened: alloc::vec::IntoIter<(B256, FlattenedHashedPostStateItem)>,
size: usize,
increased: usize,
}
#[derive(Debug)]
@@ -731,7 +738,17 @@ impl ChunkedHashedPostState {
// 3. Account update
.sorted_by_key(|(address, _)| *address);
Self { flattened, size }
Self { flattened, size, increased: 0 }
}
fn new_with_increased(
hashed_post_state: HashedPostState,
size: usize,
increased: usize,
) -> Self {
let mut this = Self::new(hashed_post_state, size);
this.increased = increased;
this
}
}
@@ -741,8 +758,10 @@ impl Iterator for ChunkedHashedPostState {
fn next(&mut self) -> Option<Self::Item> {
let mut chunk = HashedPostState::default();
let extra = usize::from(self.increased > 0);
let mut current_size = 0;
while current_size < self.size {
while current_size < (self.size + extra) {
let Some((address, item)) = self.flattened.next() else { break };
match item {
@@ -763,6 +782,9 @@ impl Iterator for ChunkedHashedPostState {
if chunk.is_empty() {
None
} else {
if extra == 1 {
self.increased -= 1;
}
Some(chunk)
}
}
@@ -776,6 +798,27 @@ mod tests {
use revm_database::{states::StorageSlot, StorageWithOriginalValues};
use revm_state::{AccountInfo, Bytecode};
// Local copy for testing since chunking utilities are defined in the engine crate now.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
struct ChunkPlan {
chunks: usize,
base: usize,
increased: usize,
}
fn compute_chunk_plan(total: usize, idle: usize, min_chunk: usize) -> Option<ChunkPlan> {
if idle == 0 || total == 0 {
return None;
}
let max_chunks_amount = total / min_chunk;
let chunks = max_chunks_amount.min(idle);
(chunks >= 2).then(|| {
let base = total / chunks;
let increased = total % chunks;
ChunkPlan { chunks, base, increased }
})
}
#[test]
fn hashed_state_wiped_extension() {
let hashed_address = B256::default();
@@ -1529,4 +1572,41 @@ mod tests {
assert_eq!(storage.storage.len(), 3);
assert!(storage.wiped);
}
#[test]
fn chunking_distribution_matches_plan() {
// Build a hashed post state with:
// addr1: wiped + 2 storage updates -> 3 items
// addr2: 1 storage update -> 1 item
// addr3: account update (None) -> 1 item
// total = 5 items
let addr1 = B256::with_last_byte(0xA1);
let addr2 = B256::with_last_byte(0xA2);
let addr3 = B256::with_last_byte(0xA3);
let mut state = HashedPostState::default();
let mut st1 = HashedStorage::new(true);
st1.storage.insert(B256::with_last_byte(0x01), U256::from(1));
st1.storage.insert(B256::with_last_byte(0x02), U256::from(2));
state.storages.insert(addr1, st1);
let mut st2 = HashedStorage::new(false);
st2.storage.insert(B256::with_last_byte(0x03), U256::from(3));
state.storages.insert(addr2, st2);
state.accounts.insert(addr3, None);
assert_eq!(state.chunking_length(), 5);
// idle=2, min=2 -> chunks=2 -> base=2, increased=1 -> sizes [3,2]
let plan = compute_chunk_plan(5, 2, 2).unwrap();
assert_eq!(plan, ChunkPlan { chunks: 2, base: 2, increased: 1 });
let sizes = state
.chunks_with(plan.base, plan.increased)
.map(|chunk| chunk.chunking_length())
.collect::<Vec<_>>();
assert_eq!(sizes, vec![3, 2]);
assert_eq!(sizes.len(), plan.chunks);
}
}

View File

@@ -90,6 +90,12 @@ impl MultiProofTargets {
ChunkedMultiProofTargets::new(self, size)
}
/// Returns an iterator that yields chunks where the first `increased` chunks have size
/// `size + 1` and the remaining have size `size`.
pub fn chunks_with(self, size: usize, increased: usize) -> ChunkedMultiProofTargets {
ChunkedMultiProofTargets::new_with_increased(self, size, increased)
}
/// Returns the number of items that will be considered during chunking in `[Self::chunks]`.
pub fn chunking_length(&self) -> usize {
self.values().map(|slots| 1 + slots.len().saturating_sub(1)).sum::<usize>()
@@ -120,6 +126,7 @@ impl MultiProofTargets {
pub struct ChunkedMultiProofTargets {
flattened_targets: alloc::vec::IntoIter<(B256, Option<B256>)>,
size: usize,
increased: usize,
}
impl ChunkedMultiProofTargets {
@@ -139,7 +146,13 @@ impl ChunkedMultiProofTargets {
}
})
.sorted();
Self { flattened_targets, size }
Self { flattened_targets, size, increased: 0 }
}
fn new_with_increased(targets: MultiProofTargets, size: usize, increased: usize) -> Self {
let mut this = Self::new(targets, size);
this.increased = increased;
this
}
}
@@ -147,7 +160,9 @@ impl Iterator for ChunkedMultiProofTargets {
type Item = MultiProofTargets;
fn next(&mut self) -> Option<Self::Item> {
let chunk = self.flattened_targets.by_ref().take(self.size).fold(
let extra = usize::from(self.increased > 0);
let chunk = self.flattened_targets.by_ref().take(self.size + extra).fold(
MultiProofTargets::default(),
|mut acc, (address, slot)| {
let entry = acc.entry(address).or_default();
@@ -161,6 +176,9 @@ impl Iterator for ChunkedMultiProofTargets {
if chunk.is_empty() {
None
} else {
if extra == 1 {
self.increased -= 1;
}
Some(chunk)
}
}
@@ -1101,4 +1119,28 @@ mod tests {
);
}
}
#[test]
fn no_workers_no_chunk_plan() {
fn compute_chunk_plan(
total: usize,
idle: usize,
min_chunk: usize,
) -> Option<(usize, usize, usize)> {
if idle == 0 || total == 0 {
return None;
}
let max_chunks_amount = total / min_chunk;
let chunks = max_chunks_amount.min(idle);
(chunks >= 2).then(|| {
let base = total / chunks;
let increased = total % chunks;
(chunks, base, increased)
})
}
assert!(compute_chunk_plan(10, 0, 3).is_none());
assert!(compute_chunk_plan(0, 2, 1).is_none());
// With workers but too small total for 2 chunks -> max_chunks=1 -> None
assert!(compute_chunk_plan(3, 4, 3).is_none());
}
}