mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
1 Commits
devnet4
...
yk/stac-th
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
762ea1417e |
@@ -114,6 +114,10 @@ harness = false
|
||||
name = "state_root_task"
|
||||
harness = false
|
||||
|
||||
[[bench]]
|
||||
name = "sparse_trie_small_work"
|
||||
harness = false
|
||||
|
||||
[features]
|
||||
test-utils = [
|
||||
"reth-chain-state/test-utils",
|
||||
|
||||
131
crates/engine/tree/benches/sparse_trie_small_work.rs
Normal file
131
crates/engine/tree/benches/sparse_trie_small_work.rs
Normal file
@@ -0,0 +1,131 @@
|
||||
//! Microbenchmark: serial vs parallel crossover for sparse trie storage operations.
|
||||
//!
|
||||
//! The key question: at what (active_tries, work_per_item) does Rayon become worthwhile?
|
||||
//!
|
||||
//! We sweep both dimensions to produce a crossover map suitable for threshold selection.
|
||||
|
||||
use alloy_primitives::{keccak256, B256};
|
||||
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
|
||||
use revm_primitives::{hash_map::Entry, B256Map};
|
||||
use std::{hint::black_box, time::Instant};
|
||||
|
||||
/// Active tries sweep — fine granularity around expected crossover.
|
||||
const ACTIVE_TRIES: [usize; 16] = [1, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 24, 28, 32, 48, 64];
|
||||
|
||||
/// Simulated per-item work costs in keccak256 iterations.
|
||||
/// 1 keccak ≈ 200ns, so:
|
||||
/// 1 → ~200ns (trivial: hashmap-only updates)
|
||||
/// 5 → ~1µs (light trie update, few nodes)
|
||||
/// 25 → ~5µs (moderate trie update)
|
||||
/// 50 → ~10µs (heavy trie update / shallow root)
|
||||
/// 250 → ~50µs (large storage trie root computation)
|
||||
/// 500 → ~100µs (very large storage trie root)
|
||||
const WORK_SIZES: [usize; 6] = [1, 5, 25, 50, 250, 500];
|
||||
|
||||
const ROUNDS: usize = 5_000;
|
||||
const TRIALS: usize = 5;
|
||||
|
||||
/// Simulates per-trie work by running `n_hashes` keccak256 rounds.
|
||||
#[inline(never)]
|
||||
fn simulated_work(seed: u64, n_hashes: usize) -> B256 {
|
||||
let mut input = [0u8; 32];
|
||||
input[24..].copy_from_slice(&seed.to_be_bytes());
|
||||
let mut h = keccak256(input);
|
||||
for _ in 1..n_hashes {
|
||||
h = keccak256(h);
|
||||
}
|
||||
h
|
||||
}
|
||||
|
||||
fn main() {
|
||||
eprintln!(
|
||||
"Sweeping {} active_tries x {} work_sizes x {} trials x {} rounds",
|
||||
ACTIVE_TRIES.len(),
|
||||
WORK_SIZES.len(),
|
||||
TRIALS,
|
||||
ROUNDS
|
||||
);
|
||||
|
||||
// Warmup rayon pool
|
||||
let _: B256 = (0..1024u64)
|
||||
.into_par_iter()
|
||||
.map(|i| simulated_work(i, 10))
|
||||
.reduce(|| B256::ZERO, |a, b| a ^ b);
|
||||
|
||||
println!("work_hashes,active_tries,mode,trial,total_us,us_per_round");
|
||||
|
||||
for &work in &WORK_SIZES {
|
||||
for &active in &ACTIVE_TRIES {
|
||||
// Serial
|
||||
for trial in 0..TRIALS {
|
||||
let start = Instant::now();
|
||||
let mut acc = B256::ZERO;
|
||||
for r in 0..ROUNDS {
|
||||
for i in 0..active {
|
||||
acc ^= simulated_work((r * active + i) as u64, work);
|
||||
}
|
||||
}
|
||||
black_box(acc);
|
||||
let total_us = start.elapsed().as_micros();
|
||||
let us_per_round = total_us as f64 / ROUNDS as f64;
|
||||
println!("{work},{active},serial,{trial},{total_us},{us_per_round:.2}");
|
||||
}
|
||||
|
||||
// Parallel with min_len = active (effectively serial in rayon — measures overhead)
|
||||
for trial in 0..TRIALS {
|
||||
let start = Instant::now();
|
||||
let mut acc = B256::ZERO;
|
||||
for r in 0..ROUNDS {
|
||||
let round = (0..active)
|
||||
.into_par_iter()
|
||||
.with_min_len(active)
|
||||
.map(|i| simulated_work((r * active + i) as u64, work))
|
||||
.reduce(|| B256::ZERO, |a, b| a ^ b);
|
||||
acc ^= round;
|
||||
}
|
||||
black_box(acc);
|
||||
let total_us = start.elapsed().as_micros();
|
||||
let us_per_round = total_us as f64 / ROUNDS as f64;
|
||||
println!("{work},{active},par_noop,{trial},{total_us},{us_per_round:.2}");
|
||||
}
|
||||
|
||||
// Parallel with min_len=8 (the production setting)
|
||||
for trial in 0..TRIALS {
|
||||
let start = Instant::now();
|
||||
let mut acc = B256::ZERO;
|
||||
for r in 0..ROUNDS {
|
||||
let round = (0..active)
|
||||
.into_par_iter()
|
||||
.with_min_len(8)
|
||||
.map(|i| simulated_work((r * active + i) as u64, work))
|
||||
.reduce(|| B256::ZERO, |a, b| a ^ b);
|
||||
acc ^= round;
|
||||
}
|
||||
black_box(acc);
|
||||
let total_us = start.elapsed().as_micros();
|
||||
let us_per_round = total_us as f64 / ROUNDS as f64;
|
||||
println!("{work},{active},par_ml8,{trial},{total_us},{us_per_round:.2}");
|
||||
}
|
||||
|
||||
// Parallel with min_len=1 (maximum splitting)
|
||||
for trial in 0..TRIALS {
|
||||
let start = Instant::now();
|
||||
let mut acc = B256::ZERO;
|
||||
for r in 0..ROUNDS {
|
||||
let round = (0..active)
|
||||
.into_par_iter()
|
||||
.with_min_len(1)
|
||||
.map(|i| simulated_work((r * active + i) as u64, work))
|
||||
.reduce(|| B256::ZERO, |a, b| a ^ b);
|
||||
acc ^= round;
|
||||
}
|
||||
black_box(acc);
|
||||
let total_us = start.elapsed().as_micros();
|
||||
let us_per_round = total_us as f64 / ROUNDS as f64;
|
||||
println!("{work},{active},par_ml1,{trial},{total_us},{us_per_round:.2}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
eprintln!("Done.");
|
||||
}
|
||||
@@ -10,8 +10,8 @@ use crate::tree::{
|
||||
use alloy_primitives::B256;
|
||||
use alloy_rlp::{Decodable, Encodable};
|
||||
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
|
||||
use rayon::iter::ParallelIterator;
|
||||
use reth_primitives_traits::{Account, FastInstant as Instant, ParallelBridgeBuffered};
|
||||
use rayon::iter::{IntoParallelIterator, ParallelIterator};
|
||||
use reth_primitives_traits::{Account, FastInstant as Instant};
|
||||
use reth_tasks::Runtime;
|
||||
use reth_trie::{
|
||||
proof_v2::Target, updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, TrieAccount,
|
||||
@@ -36,6 +36,18 @@ use tracing::{debug, debug_span, error, instrument};
|
||||
/// Maximum number of pending/prewarm updates that we accumulate in memory before actually applying.
|
||||
const MAX_PENDING_UPDATES: usize = 100;
|
||||
|
||||
/// Minimum number of active storage tries before leaf updates are fanned out to rayon workers.
|
||||
///
|
||||
/// Rayon scheduling overhead (~3µs) is amortized at 2+ tries since real per-trie work
|
||||
/// (update_leaves / root computation) is ~100µs+ for typical storage sizes.
|
||||
const MIN_ACTIVE_STORAGE_TRIES_FOR_PARALLEL_LEAF_UPDATES: usize = 2;
|
||||
|
||||
/// Minimum number of storage tries with uncached roots before roots are computed in parallel.
|
||||
///
|
||||
/// A single root() call on a storage trie with 20+ slots costs ~100µs+,
|
||||
/// so parallel dispatch pays for itself starting at 2 tries.
|
||||
const MIN_ACTIVE_STORAGE_TRIES_FOR_PARALLEL_ROOTS: usize = 2;
|
||||
|
||||
/// Sparse trie task implementation that uses in-memory sparse trie data to schedule proof fetching.
|
||||
pub(super) struct SparseTrieCacheTask<A = ParallelSparseTrie, S = ParallelSparseTrie> {
|
||||
/// Sender for proof results.
|
||||
@@ -489,47 +501,79 @@ where
|
||||
let storage_updates =
|
||||
if new { &mut self.new_storage_updates } else { &mut self.storage_updates };
|
||||
|
||||
// Process all storage updates in parallel, skipping tries with no pending updates.
|
||||
let span = tracing::Span::current();
|
||||
let storage_results = storage_updates
|
||||
// Collect all storage updates, skipping tries with no pending updates.
|
||||
// We branch to a serial path for tiny workloads to avoid rayon scheduling overhead.
|
||||
let storage_work = storage_updates
|
||||
.iter_mut()
|
||||
.filter(|(_, updates)| !updates.is_empty())
|
||||
.map(|(address, updates)| {
|
||||
let trie = self.trie.take_or_create_storage_trie(address);
|
||||
let fetched = self.fetched_storage_targets.remove(address).unwrap_or_default();
|
||||
|
||||
(address, updates, fetched, trie)
|
||||
(*address, updates, fetched, trie)
|
||||
})
|
||||
.par_bridge_buffered()
|
||||
.map(|(address, updates, mut fetched, mut trie)| {
|
||||
let _enter = debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage_trie_leaf_updates", a=%address).entered();
|
||||
let mut targets = Vec::new();
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
trie.update_leaves(updates, |path, min_len| match fetched.entry(path) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
if min_len < *entry.get() {
|
||||
entry.insert(min_len);
|
||||
targets.push(Target::new(path).with_min_len(min_len));
|
||||
}
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(min_len);
|
||||
targets.push(Target::new(path).with_min_len(min_len));
|
||||
}
|
||||
})?;
|
||||
let span = tracing::Span::current();
|
||||
|
||||
SparseTrieResult::Ok((address, targets, fetched, trie))
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
let storage_results = if storage_work.len() <
|
||||
MIN_ACTIVE_STORAGE_TRIES_FOR_PARALLEL_LEAF_UPDATES
|
||||
{
|
||||
storage_work
|
||||
.into_iter()
|
||||
.map(|(address, updates, mut fetched, mut trie)| {
|
||||
let _enter = debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage_trie_leaf_updates", a=%address).entered();
|
||||
let mut targets = Vec::new();
|
||||
|
||||
trie.update_leaves(updates, |path, min_len| match fetched.entry(path) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
if min_len < *entry.get() {
|
||||
entry.insert(min_len);
|
||||
targets.push(Target::new(path).with_min_len(min_len));
|
||||
}
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(min_len);
|
||||
targets.push(Target::new(path).with_min_len(min_len));
|
||||
}
|
||||
})?;
|
||||
|
||||
SparseTrieResult::Ok((address, targets, fetched, trie))
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()?
|
||||
} else {
|
||||
storage_work
|
||||
.into_par_iter()
|
||||
.map(|(address, updates, mut fetched, mut trie)| {
|
||||
let _enter = debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage_trie_leaf_updates", a=%address).entered();
|
||||
let mut targets = Vec::new();
|
||||
|
||||
trie.update_leaves(updates, |path, min_len| match fetched.entry(path) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
if min_len < *entry.get() {
|
||||
entry.insert(min_len);
|
||||
targets.push(Target::new(path).with_min_len(min_len));
|
||||
}
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(min_len);
|
||||
targets.push(Target::new(path).with_min_len(min_len));
|
||||
}
|
||||
})?;
|
||||
|
||||
SparseTrieResult::Ok((address, targets, fetched, trie))
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()?
|
||||
};
|
||||
|
||||
drop(span);
|
||||
|
||||
for (address, targets, fetched, trie) in storage_results {
|
||||
self.fetched_storage_targets.insert(*address, fetched);
|
||||
self.trie.insert_storage_trie(*address, trie);
|
||||
self.fetched_storage_targets.insert(address, fetched);
|
||||
self.trie.insert_storage_trie(address, trie);
|
||||
|
||||
if !targets.is_empty() {
|
||||
self.pending_targets.extend_storage_targets(address, targets);
|
||||
self.pending_targets.extend_storage_targets(&address, targets);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -589,7 +633,7 @@ where
|
||||
}
|
||||
|
||||
let span = debug_span!("compute_storage_roots").entered();
|
||||
self
|
||||
let storage_roots_work = self
|
||||
.trie
|
||||
.storage_tries_mut()
|
||||
.iter_mut()
|
||||
@@ -597,11 +641,21 @@ where
|
||||
self.storage_updates.get(*address).is_some_and(|updates| updates.is_empty()) &&
|
||||
!trie.is_root_cached()
|
||||
})
|
||||
.par_bridge_buffered()
|
||||
.for_each(|(address, trie)| {
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
if storage_roots_work.len() < MIN_ACTIVE_STORAGE_TRIES_FOR_PARALLEL_ROOTS {
|
||||
storage_roots_work.into_iter().for_each(|(address, trie)| {
|
||||
let _enter = debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage_root", ?address).entered();
|
||||
trie.root().expect("updates are drained, trie should be revealed by now");
|
||||
});
|
||||
} else {
|
||||
storage_roots_work
|
||||
.into_par_iter()
|
||||
.for_each(|(address, trie)| {
|
||||
let _enter = debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage_root", ?address).entered();
|
||||
trie.root().expect("updates are drained, trie should be revealed by now");
|
||||
});
|
||||
}
|
||||
drop(span);
|
||||
|
||||
loop {
|
||||
|
||||
Reference in New Issue
Block a user