Compare commits

...

1 Commits

3 changed files with 220 additions and 31 deletions

View File

@@ -114,6 +114,10 @@ harness = false
name = "state_root_task"
harness = false
[[bench]]
name = "sparse_trie_small_work"
harness = false
[features]
test-utils = [
"reth-chain-state/test-utils",

View File

@@ -0,0 +1,131 @@
//! Microbenchmark: serial vs parallel crossover for sparse trie storage operations.
//!
//! The key question: at what (active_tries, work_per_item) does Rayon become worthwhile?
//!
//! We sweep both dimensions to produce a crossover map suitable for threshold selection.
use alloy_primitives::{keccak256, B256};
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
use revm_primitives::{hash_map::Entry, B256Map};
use std::{hint::black_box, time::Instant};
/// Active tries sweep — fine granularity around expected crossover.
const ACTIVE_TRIES: [usize; 16] = [1, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 24, 28, 32, 48, 64];
/// Simulated per-item work costs in keccak256 iterations.
/// 1 keccak ≈ 200ns, so:
/// 1 → ~200ns (trivial: hashmap-only updates)
/// 5 → ~1µs (light trie update, few nodes)
/// 25 → ~5µs (moderate trie update)
/// 50 → ~10µs (heavy trie update / shallow root)
/// 250 → ~50µs (large storage trie root computation)
/// 500 → ~100µs (very large storage trie root)
const WORK_SIZES: [usize; 6] = [1, 5, 25, 50, 250, 500];
const ROUNDS: usize = 5_000;
const TRIALS: usize = 5;
/// Simulates per-trie work by running `n_hashes` keccak256 rounds.
#[inline(never)]
fn simulated_work(seed: u64, n_hashes: usize) -> B256 {
let mut input = [0u8; 32];
input[24..].copy_from_slice(&seed.to_be_bytes());
let mut h = keccak256(input);
for _ in 1..n_hashes {
h = keccak256(h);
}
h
}
fn main() {
eprintln!(
"Sweeping {} active_tries x {} work_sizes x {} trials x {} rounds",
ACTIVE_TRIES.len(),
WORK_SIZES.len(),
TRIALS,
ROUNDS
);
// Warmup rayon pool
let _: B256 = (0..1024u64)
.into_par_iter()
.map(|i| simulated_work(i, 10))
.reduce(|| B256::ZERO, |a, b| a ^ b);
println!("work_hashes,active_tries,mode,trial,total_us,us_per_round");
for &work in &WORK_SIZES {
for &active in &ACTIVE_TRIES {
// Serial
for trial in 0..TRIALS {
let start = Instant::now();
let mut acc = B256::ZERO;
for r in 0..ROUNDS {
for i in 0..active {
acc ^= simulated_work((r * active + i) as u64, work);
}
}
black_box(acc);
let total_us = start.elapsed().as_micros();
let us_per_round = total_us as f64 / ROUNDS as f64;
println!("{work},{active},serial,{trial},{total_us},{us_per_round:.2}");
}
// Parallel with min_len = active (effectively serial in rayon — measures overhead)
for trial in 0..TRIALS {
let start = Instant::now();
let mut acc = B256::ZERO;
for r in 0..ROUNDS {
let round = (0..active)
.into_par_iter()
.with_min_len(active)
.map(|i| simulated_work((r * active + i) as u64, work))
.reduce(|| B256::ZERO, |a, b| a ^ b);
acc ^= round;
}
black_box(acc);
let total_us = start.elapsed().as_micros();
let us_per_round = total_us as f64 / ROUNDS as f64;
println!("{work},{active},par_noop,{trial},{total_us},{us_per_round:.2}");
}
// Parallel with min_len=8 (the production setting)
for trial in 0..TRIALS {
let start = Instant::now();
let mut acc = B256::ZERO;
for r in 0..ROUNDS {
let round = (0..active)
.into_par_iter()
.with_min_len(8)
.map(|i| simulated_work((r * active + i) as u64, work))
.reduce(|| B256::ZERO, |a, b| a ^ b);
acc ^= round;
}
black_box(acc);
let total_us = start.elapsed().as_micros();
let us_per_round = total_us as f64 / ROUNDS as f64;
println!("{work},{active},par_ml8,{trial},{total_us},{us_per_round:.2}");
}
// Parallel with min_len=1 (maximum splitting)
for trial in 0..TRIALS {
let start = Instant::now();
let mut acc = B256::ZERO;
for r in 0..ROUNDS {
let round = (0..active)
.into_par_iter()
.with_min_len(1)
.map(|i| simulated_work((r * active + i) as u64, work))
.reduce(|| B256::ZERO, |a, b| a ^ b);
acc ^= round;
}
black_box(acc);
let total_us = start.elapsed().as_micros();
let us_per_round = total_us as f64 / ROUNDS as f64;
println!("{work},{active},par_ml1,{trial},{total_us},{us_per_round:.2}");
}
}
}
eprintln!("Done.");
}

View File

@@ -10,8 +10,8 @@ use crate::tree::{
use alloy_primitives::B256;
use alloy_rlp::{Decodable, Encodable};
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use rayon::iter::ParallelIterator;
use reth_primitives_traits::{Account, FastInstant as Instant, ParallelBridgeBuffered};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use reth_primitives_traits::{Account, FastInstant as Instant};
use reth_tasks::Runtime;
use reth_trie::{
proof_v2::Target, updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, TrieAccount,
@@ -36,6 +36,18 @@ use tracing::{debug, debug_span, error, instrument};
/// Maximum number of pending/prewarm updates that we accumulate in memory before actually applying.
const MAX_PENDING_UPDATES: usize = 100;
/// Minimum number of active storage tries before leaf updates are fanned out to rayon workers.
///
/// Rayon scheduling overhead (~3µs) is amortized at 2+ tries since real per-trie work
/// (update_leaves / root computation) is ~100µs+ for typical storage sizes.
const MIN_ACTIVE_STORAGE_TRIES_FOR_PARALLEL_LEAF_UPDATES: usize = 2;
/// Minimum number of storage tries with uncached roots before roots are computed in parallel.
///
/// A single root() call on a storage trie with 20+ slots costs ~100µs+,
/// so parallel dispatch pays for itself starting at 2 tries.
const MIN_ACTIVE_STORAGE_TRIES_FOR_PARALLEL_ROOTS: usize = 2;
/// Sparse trie task implementation that uses in-memory sparse trie data to schedule proof fetching.
pub(super) struct SparseTrieCacheTask<A = ParallelSparseTrie, S = ParallelSparseTrie> {
/// Sender for proof results.
@@ -489,47 +501,79 @@ where
let storage_updates =
if new { &mut self.new_storage_updates } else { &mut self.storage_updates };
// Process all storage updates in parallel, skipping tries with no pending updates.
let span = tracing::Span::current();
let storage_results = storage_updates
// Collect all storage updates, skipping tries with no pending updates.
// We branch to a serial path for tiny workloads to avoid rayon scheduling overhead.
let storage_work = storage_updates
.iter_mut()
.filter(|(_, updates)| !updates.is_empty())
.map(|(address, updates)| {
let trie = self.trie.take_or_create_storage_trie(address);
let fetched = self.fetched_storage_targets.remove(address).unwrap_or_default();
(address, updates, fetched, trie)
(*address, updates, fetched, trie)
})
.par_bridge_buffered()
.map(|(address, updates, mut fetched, mut trie)| {
let _enter = debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage_trie_leaf_updates", a=%address).entered();
let mut targets = Vec::new();
.collect::<Vec<_>>();
trie.update_leaves(updates, |path, min_len| match fetched.entry(path) {
Entry::Occupied(mut entry) => {
if min_len < *entry.get() {
entry.insert(min_len);
targets.push(Target::new(path).with_min_len(min_len));
}
}
Entry::Vacant(entry) => {
entry.insert(min_len);
targets.push(Target::new(path).with_min_len(min_len));
}
})?;
let span = tracing::Span::current();
SparseTrieResult::Ok((address, targets, fetched, trie))
})
.collect::<Result<Vec<_>, _>>()?;
let storage_results = if storage_work.len() <
MIN_ACTIVE_STORAGE_TRIES_FOR_PARALLEL_LEAF_UPDATES
{
storage_work
.into_iter()
.map(|(address, updates, mut fetched, mut trie)| {
let _enter = debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage_trie_leaf_updates", a=%address).entered();
let mut targets = Vec::new();
trie.update_leaves(updates, |path, min_len| match fetched.entry(path) {
Entry::Occupied(mut entry) => {
if min_len < *entry.get() {
entry.insert(min_len);
targets.push(Target::new(path).with_min_len(min_len));
}
}
Entry::Vacant(entry) => {
entry.insert(min_len);
targets.push(Target::new(path).with_min_len(min_len));
}
})?;
SparseTrieResult::Ok((address, targets, fetched, trie))
})
.collect::<Result<Vec<_>, _>>()?
} else {
storage_work
.into_par_iter()
.map(|(address, updates, mut fetched, mut trie)| {
let _enter = debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage_trie_leaf_updates", a=%address).entered();
let mut targets = Vec::new();
trie.update_leaves(updates, |path, min_len| match fetched.entry(path) {
Entry::Occupied(mut entry) => {
if min_len < *entry.get() {
entry.insert(min_len);
targets.push(Target::new(path).with_min_len(min_len));
}
}
Entry::Vacant(entry) => {
entry.insert(min_len);
targets.push(Target::new(path).with_min_len(min_len));
}
})?;
SparseTrieResult::Ok((address, targets, fetched, trie))
})
.collect::<Result<Vec<_>, _>>()?
};
drop(span);
for (address, targets, fetched, trie) in storage_results {
self.fetched_storage_targets.insert(*address, fetched);
self.trie.insert_storage_trie(*address, trie);
self.fetched_storage_targets.insert(address, fetched);
self.trie.insert_storage_trie(address, trie);
if !targets.is_empty() {
self.pending_targets.extend_storage_targets(address, targets);
self.pending_targets.extend_storage_targets(&address, targets);
}
}
@@ -589,7 +633,7 @@ where
}
let span = debug_span!("compute_storage_roots").entered();
self
let storage_roots_work = self
.trie
.storage_tries_mut()
.iter_mut()
@@ -597,11 +641,21 @@ where
self.storage_updates.get(*address).is_some_and(|updates| updates.is_empty()) &&
!trie.is_root_cached()
})
.par_bridge_buffered()
.for_each(|(address, trie)| {
.collect::<Vec<_>>();
if storage_roots_work.len() < MIN_ACTIVE_STORAGE_TRIES_FOR_PARALLEL_ROOTS {
storage_roots_work.into_iter().for_each(|(address, trie)| {
let _enter = debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage_root", ?address).entered();
trie.root().expect("updates are drained, trie should be revealed by now");
});
} else {
storage_roots_work
.into_par_iter()
.for_each(|(address, trie)| {
let _enter = debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage_root", ?address).entered();
trie.root().expect("updates are drained, trie should be revealed by now");
});
}
drop(span);
loop {