perf(tree): chunk multiproofs (#14800)

This commit is contained in:
Alexey Shekhirin
2025-03-06 15:45:29 +00:00
committed by GitHub
parent 17d33c04ca
commit a765af9e6b
4 changed files with 478 additions and 76 deletions

View File

@@ -2430,10 +2430,18 @@ where
if self.config.use_state_root_task() {
match handle.state_root() {
Ok(StateRootComputeOutcome { state_root, trie_updates }) => {
let elapsed = execution_finish.elapsed();
info!(target: "engine::tree", ?state_root, ?elapsed, "State root task finished");
// we double check the state root here for good measure
if state_root == block.header().state_root() {
maybe_state_root =
Some((state_root, trie_updates, execution_finish.elapsed()))
maybe_state_root = Some((state_root, trie_updates, elapsed))
} else {
warn!(
target: "engine::tree",
?state_root,
block_state_root = ?block.header().state_root(),
"State root task returned incorrect state root"
);
}
}
Err(error) => {

View File

@@ -1,7 +1,7 @@
//! Multiproof task related functionality.
use crate::tree::payload_processor::{executor::WorkloadExecutor, sparse_trie::SparseTrieEvent};
use alloy_primitives::map::HashSet;
use alloy_primitives::{keccak256, map::HashSet, B256};
use derive_more::derive::Deref;
use metrics::Histogram;
use reth_errors::ProviderError;
@@ -16,7 +16,6 @@ use reth_trie::{
HashedPostStateSorted, HashedStorage, MultiProof, MultiProofTargets, TrieInput,
};
use reth_trie_parallel::proof::ParallelProof;
use revm_primitives::{keccak256, B256};
use std::{
collections::{BTreeMap, VecDeque},
ops::DerefMut,
@@ -28,6 +27,9 @@ use std::{
};
use tracing::{debug, error, trace};
/// The size of proof targets chunk to spawn in one calculation.
const MULTIPROOF_TARGETS_CHUNK_SIZE: usize = 10;
/// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the
/// state.
#[derive(Default, Debug)]
@@ -95,7 +97,10 @@ pub(super) enum MultiProofMessage {
PrefetchProofs(MultiProofTargets),
/// New state update from transaction execution with its source
StateUpdate(StateChangeSource, EvmState),
/// Empty proof for a specific state update
/// State update that can be applied to the sparse trie without any new proofs.
///
/// It can be the case when all accounts and storage slots from the state update were already
/// fetched and revealed.
EmptyProof {
/// The index of this proof in the sequence of state updates
sequence_number: u64,
@@ -120,10 +125,6 @@ pub(super) struct ProofCalculated {
sequence_number: u64,
/// Sparse trie update
update: SparseTrieUpdate,
/// Total number of account targets
account_targets: usize,
/// Total number of storage slot targets
storage_targets: usize,
/// The time taken to calculate the proof.
elapsed: Duration,
}
@@ -336,7 +337,7 @@ where
self.executor.spawn_blocking(move || {
let account_targets = proof_targets.len();
let storage_targets = proof_targets.values().map(|slots| slots.len()).sum();
let storage_targets = proof_targets.values().map(|slots| slots.len()).sum::<usize>();
trace!(
target: "engine::root",
@@ -376,8 +377,6 @@ where
state: hashed_state_update,
multiproof: proof,
},
account_targets,
storage_targets,
elapsed,
}),
));
@@ -402,12 +401,22 @@ pub(crate) struct MultiProofTaskMetrics {
/// Histogram of pending multiproofs.
pub pending_multiproofs_histogram: Histogram,
/// Histogram of the number of prefetch proof target accounts.
pub prefetch_proof_targets_accounts_histogram: Histogram,
/// Histogram of the number of prefetch proof target storages.
pub prefetch_proof_targets_storages_histogram: Histogram,
/// Histogram of the number of prefetch proof target chunks.
pub prefetch_proof_chunks_histogram: Histogram,
/// Histogram of the number of state update proof target accounts.
pub state_update_proof_targets_accounts_histogram: Histogram,
/// Histogram of the number of state update proof target storages.
pub state_update_proof_targets_storages_histogram: Histogram,
/// Histogram of the number of state update proof target chunks.
pub state_update_proof_chunks_histogram: Histogram,
/// Histogram of proof calculation durations.
pub proof_calculation_duration_histogram: Histogram,
/// Histogram of proof calculation account targets.
pub proof_calculation_account_targets_histogram: Histogram,
/// Histogram of proof calculation storage targets.
pub proof_calculation_storage_targets_histogram: Histogram,
/// Histogram of sparse trie update durations.
pub sparse_trie_update_duration_histogram: Histogram,
@@ -482,40 +491,56 @@ where
}
/// Handles request for proof prefetch.
fn on_prefetch_proof(&mut self, targets: MultiProofTargets) {
///
/// Returns a number of proofs that were spawned.
fn on_prefetch_proof(&mut self, targets: MultiProofTargets) -> u64 {
let proof_targets = self.get_prefetch_proof_targets(targets);
self.fetched_proof_targets.extend_ref(&proof_targets);
self.multiproof_manager.spawn_or_queue(MultiproofInput {
config: self.config.clone(),
source: None,
hashed_state_update: Default::default(),
proof_targets,
proof_sequence_number: self.proof_sequencer.next_sequence(),
state_root_message_sender: self.tx.clone(),
});
self.metrics.prefetch_proof_targets_accounts_histogram.record(proof_targets.len() as f64);
self.metrics
.prefetch_proof_targets_storages_histogram
.record(proof_targets.values().map(|slots| slots.len()).sum::<usize>() as f64);
// Process proof targets in chunks.
let mut chunks = 0;
for proof_targets_chunk in proof_targets.chunks(MULTIPROOF_TARGETS_CHUNK_SIZE) {
self.multiproof_manager.spawn_or_queue(MultiproofInput {
config: self.config.clone(),
source: None,
hashed_state_update: Default::default(),
proof_targets: proof_targets_chunk,
proof_sequence_number: self.proof_sequencer.next_sequence(),
state_root_message_sender: self.tx.clone(),
});
chunks += 1;
}
self.metrics.prefetch_proof_chunks_histogram.record(chunks as f64);
chunks
}
// Returns true if all state updates finished and all proofs processed.
fn is_done(
&self,
proofs_processed: u64,
updates_received: u64,
prefetch_proofs_received: u64,
state_update_proofs_requested: u64,
prefetch_proofs_requested: u64,
updates_finished: bool,
) -> bool {
let all_proofs_received = proofs_processed >= updates_received + prefetch_proofs_received;
let all_proofs_processed =
proofs_processed >= state_update_proofs_requested + prefetch_proofs_requested;
let no_pending = !self.proof_sequencer.has_pending();
debug!(
target: "engine::root",
proofs_processed,
updates_received,
prefetch_proofs_received,
state_update_proofs_requested,
prefetch_proofs_requested,
no_pending,
updates_finished,
"Checking end condition"
);
all_proofs_received && no_pending && updates_finished
all_proofs_processed && no_pending && updates_finished
}
/// Calls `get_proof_targets` with existing proof targets for prefetching.
@@ -570,25 +595,54 @@ where
/// Handles state updates.
///
/// Returns proof targets derived from the state update.
fn on_state_update(
&mut self,
source: StateChangeSource,
update: EvmState,
proof_sequence_number: u64,
) {
/// Returns a number of proofs that were spawned.
fn on_state_update(&mut self, source: StateChangeSource, update: EvmState) -> u64 {
let hashed_state_update = evm_state_to_hashed_post_state(update);
let proof_targets = get_proof_targets(&hashed_state_update, &self.fetched_proof_targets);
self.fetched_proof_targets.extend_ref(&proof_targets);
// Split the state update into already fetched and not fetched according to the proof
// targets.
let (fetched_state_update, not_fetched_state_update) =
hashed_state_update.partition_by_targets(&self.fetched_proof_targets);
self.multiproof_manager.spawn_or_queue(MultiproofInput {
config: self.config.clone(),
source: Some(source),
hashed_state_update,
proof_targets,
proof_sequence_number,
state_root_message_sender: self.tx.clone(),
});
let mut state_updates = 0;
// If there are any accounts or storage slots that we already fetched the proofs for,
// send them immediately, as they don't require spawning any additional multiproofs.
if !fetched_state_update.is_empty() {
let _ = self.tx.send(MultiProofMessage::EmptyProof {
sequence_number: self.proof_sequencer.next_sequence(),
state: fetched_state_update,
});
state_updates += 1;
}
// Process state updates in chunks.
let mut chunks = 0;
let mut spawned_proof_targets = MultiProofTargets::default();
for chunk in not_fetched_state_update.chunks(MULTIPROOF_TARGETS_CHUNK_SIZE) {
let proof_targets = get_proof_targets(&chunk, &self.fetched_proof_targets);
spawned_proof_targets.extend_ref(&proof_targets);
self.multiproof_manager.spawn_or_queue(MultiproofInput {
config: self.config.clone(),
source: Some(source),
hashed_state_update: chunk,
proof_targets,
proof_sequence_number: self.proof_sequencer.next_sequence(),
state_root_message_sender: self.tx.clone(),
});
chunks += 1;
}
self.metrics
.state_update_proof_targets_accounts_histogram
.record(spawned_proof_targets.len() as f64);
self.metrics
.state_update_proof_targets_storages_histogram
.record(spawned_proof_targets.values().map(|slots| slots.len()).sum::<usize>() as f64);
self.metrics.state_update_proof_chunks_histogram.record(chunks as f64);
self.fetched_proof_targets.extend(spawned_proof_targets);
state_updates + chunks
}
/// Handler for new proof calculated, aggregates all the existing sequential proofs.
@@ -646,8 +700,8 @@ where
/// 6. This task exits after all pending proofs are processed.
pub(crate) fn run(mut self) {
// TODO convert those into fields
let mut prefetch_proofs_received = 0;
let mut updates_received = 0;
let mut prefetch_proofs_requested = 0;
let mut state_update_proofs_requested = 0;
let mut proofs_processed = 0;
let mut updates_finished = false;
@@ -663,44 +717,45 @@ where
Ok(message) => match message {
MultiProofMessage::PrefetchProofs(targets) => {
trace!(target: "engine::root", "processing MultiProofMessage::PrefetchProofs");
prefetch_proofs_received += 1;
let account_targets = targets.len();
let storage_targets =
targets.values().map(|slots| slots.len()).sum::<usize>();
prefetch_proofs_requested += self.on_prefetch_proof(targets);
debug!(
target: "engine::root",
targets = targets.len(),
storage_targets = targets.values().map(|slots|
slots.len()).sum::<usize>(),
total_prefetches = prefetch_proofs_received,
account_targets,
storage_targets,
prefetch_proofs_requested,
"Prefetching proofs"
);
self.on_prefetch_proof(targets);
}
MultiProofMessage::StateUpdate(source, update) => {
trace!(target: "engine::root", "processing
MultiProofMessage::StateUpdate");
if updates_received == 0 {
if state_update_proofs_requested == 0 {
first_update_time = Some(Instant::now());
debug!(target: "engine::root", "Started state root calculation");
}
last_update_time = Some(Instant::now());
updates_received += 1;
let len = update.len();
state_update_proofs_requested += self.on_state_update(source, update);
debug!(
target: "engine::root",
?source,
len = update.len(),
total_updates = updates_received,
len,
?state_update_proofs_requested,
"Received new state update"
);
let next_sequence = self.proof_sequencer.next_sequence();
self.on_state_update(source, update, next_sequence);
}
MultiProofMessage::FinishedStateUpdates => {
trace!(target: "engine::root", "processing MultiProofMessage::FinishedStateUpdates");
updates_finished = true;
if self.is_done(
proofs_processed,
updates_received,
prefetch_proofs_received,
state_update_proofs_requested,
prefetch_proofs_requested,
updates_finished,
) {
debug!(
@@ -724,8 +779,8 @@ where
if self.is_done(
proofs_processed,
updates_received,
prefetch_proofs_received,
state_update_proofs_requested,
prefetch_proofs_requested,
updates_finished,
) {
debug!(
@@ -746,12 +801,6 @@ where
self.metrics
.proof_calculation_duration_histogram
.record(proof_calculated.elapsed);
self.metrics
.proof_calculation_account_targets_histogram
.record(proof_calculated.account_targets as f64);
self.metrics
.proof_calculation_storage_targets_histogram
.record(proof_calculated.storage_targets as f64);
debug!(
target: "engine::root",
@@ -770,8 +819,8 @@ where
if self.is_done(
proofs_processed,
updates_received,
prefetch_proofs_received,
state_update_proofs_requested,
prefetch_proofs_requested,
updates_finished,
) {
debug!(
@@ -802,7 +851,7 @@ where
debug!(
target: "engine::root",
total_updates = updates_received,
total_updates = state_update_proofs_requested,
total_proofs = proofs_processed,
total_time =? first_update_time.map(|t|t.elapsed()),
time_from_last_update =?last_update_time.map(|t|t.elapsed()),
@@ -810,7 +859,7 @@ where
);
// update total metrics on finish
self.metrics.state_updates_received_histogram.record(updates_received as f64);
self.metrics.state_updates_received_histogram.record(state_update_proofs_requested as f64);
self.metrics.proofs_processed_histogram.record(proofs_processed as f64);
}
}

View File

@@ -1,3 +1,5 @@
use core::ops::Not;
use crate::{
prefix_set::{PrefixSetMut, TriePrefixSetsMut},
KeyHasher, MultiProofTargets, Nibbles,
@@ -198,6 +200,78 @@ impl HashedPostState {
targets
}
/// Partition the state update into two state updates:
/// - First with accounts and storages slots that are present in the provided targets.
/// - Second with all other.
///
/// CAUTION: The state updates are expected to be applied in order, so that the storage wipes
/// are done correctly.
pub fn partition_by_targets(mut self, targets: &MultiProofTargets) -> (Self, Self) {
let mut state_updates_not_in_targets = Self::default();
self.storages.retain(|&address, storage| {
let (retain, storage_not_in_targets) = match targets.get(&address) {
Some(storage_in_targets) => {
let mut storage_not_in_targets = HashedStorage::default();
storage.storage.retain(|&slot, value| {
if storage_in_targets.contains(&slot) {
return true
}
storage_not_in_targets.storage.insert(slot, *value);
false
});
// We do not check the wiped flag here, because targets only contain addresses
// and storage slots. So if there are no storage slots left, the storage update
// can be fully removed.
let retain = !storage.storage.is_empty();
// Since state updates are expected to be applied in order, we can only set the
// wiped flag in the second storage update if the first storage update is empty
// and will not be retained.
if !retain {
storage_not_in_targets.wiped = storage.wiped;
}
(
retain,
storage_not_in_targets.is_empty().not().then_some(storage_not_in_targets),
)
}
None => (false, Some(core::mem::take(storage))),
};
if let Some(storage_not_in_targets) = storage_not_in_targets {
state_updates_not_in_targets.storages.insert(address, storage_not_in_targets);
// Storage update should have an associated account, if it exists.
if let Some(account) = self.accounts.remove(&address) {
state_updates_not_in_targets.accounts.insert(address, account);
}
}
retain
});
self.accounts.retain(|&address, account| {
if targets.contains_key(&address) {
return true
}
state_updates_not_in_targets.accounts.insert(address, *account);
false
});
(self, state_updates_not_in_targets)
}
/// Returns an iterator that yields chunks of the specified size.
///
/// See [`ChunkedHashedPostState`] for more information.
pub fn chunks(self, size: usize) -> ChunkedHashedPostState {
ChunkedHashedPostState::new(self, size)
}
/// Extend this hashed post state with contents of another.
/// Entries in the second hashed post state take precedence.
pub fn extend(&mut self, other: Self) {
@@ -418,6 +492,93 @@ impl HashedStorageSorted {
}
}
/// An iterator that yields chunks of the state updates of at most `size` account and storage
/// targets.
///
/// # Notes
/// 1. Chunks are expected to be applied in order, because of storage wipes. If applied out of
/// order, it's possible to wipe more storage than in the original state update.
/// 2. For each account, chunks with storage updates come first, followed by account updates.
#[derive(Debug)]
pub struct ChunkedHashedPostState {
flattened: alloc::vec::IntoIter<(B256, FlattenedHashedPostStateItem)>,
size: usize,
}
#[derive(Debug)]
enum FlattenedHashedPostStateItem {
Account(Option<Account>),
StorageWipe,
StorageUpdate { slot: B256, value: U256 },
}
impl ChunkedHashedPostState {
fn new(hashed_post_state: HashedPostState, size: usize) -> Self {
let flattened = hashed_post_state
.storages
.into_iter()
.flat_map(|(address, storage)| {
// Storage wipes should go first
Some((address, FlattenedHashedPostStateItem::StorageWipe))
.filter(|_| storage.wiped)
.into_iter()
.chain(
storage.storage.into_iter().sorted_unstable_by_key(|(slot, _)| *slot).map(
move |(slot, value)| {
(
address,
FlattenedHashedPostStateItem::StorageUpdate { slot, value },
)
},
),
)
})
.chain(hashed_post_state.accounts.into_iter().map(|(address, account)| {
(address, FlattenedHashedPostStateItem::Account(account))
}))
// We need stable sort here to preserve the order for each address:
// 1. Storage wipes
// 2. Storage updates
// 3. Account update
.sorted_by_key(|(address, _)| *address);
Self { flattened, size }
}
}
impl Iterator for ChunkedHashedPostState {
type Item = HashedPostState;
fn next(&mut self) -> Option<Self::Item> {
let mut chunk = HashedPostState::default();
let mut current_size = 0;
while current_size < self.size {
let Some((address, item)) = self.flattened.next() else { break };
match item {
FlattenedHashedPostStateItem::Account(account) => {
chunk.accounts.insert(address, account);
}
FlattenedHashedPostStateItem::StorageWipe => {
chunk.storages.entry(address).or_default().wiped = true;
}
FlattenedHashedPostStateItem::StorageUpdate { slot, value } => {
chunk.storages.entry(address).or_default().storage.insert(slot, value);
}
}
current_size += 1;
}
if chunk.is_empty() {
None
} else {
Some(chunk)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -759,4 +920,111 @@ mod tests {
assert!(target_slots.contains(&slot1));
assert!(target_slots.contains(&slot2));
}
#[test]
fn test_partition_by_targets() {
let addr1 = B256::random();
let addr2 = B256::random();
let slot1 = B256::random();
let slot2 = B256::random();
let state = HashedPostState {
accounts: B256Map::from_iter([
(addr1, Some(Default::default())),
(addr2, Some(Default::default())),
]),
storages: B256Map::from_iter([(
addr1,
HashedStorage {
wiped: true,
storage: B256Map::from_iter([(slot1, U256::ZERO), (slot2, U256::from(1))]),
},
)]),
};
let targets = MultiProofTargets::from_iter([(addr1, HashSet::from_iter([slot1]))]);
let (with_targets, without_targets) = state.partition_by_targets(&targets);
assert_eq!(
with_targets,
HashedPostState {
accounts: B256Map::default(),
storages: B256Map::from_iter([(
addr1,
HashedStorage {
wiped: true,
storage: B256Map::from_iter([(slot1, U256::ZERO)])
}
)]),
}
);
assert_eq!(
without_targets,
HashedPostState {
accounts: B256Map::from_iter([
(addr1, Some(Default::default())),
(addr2, Some(Default::default()))
]),
storages: B256Map::from_iter([(
addr1,
HashedStorage {
wiped: false,
storage: B256Map::from_iter([(slot2, U256::from(1))])
}
)]),
}
);
}
#[test]
fn test_chunks() {
let addr1 = B256::from([1; 32]);
let addr2 = B256::from([2; 32]);
let slot1 = B256::from([1; 32]);
let slot2 = B256::from([2; 32]);
let state = HashedPostState {
accounts: B256Map::from_iter([
(addr1, Some(Default::default())),
(addr2, Some(Default::default())),
]),
storages: B256Map::from_iter([(
addr2,
HashedStorage {
wiped: true,
storage: B256Map::from_iter([(slot1, U256::ZERO), (slot2, U256::from(1))]),
},
)]),
};
let mut chunks = state.chunks(2);
assert_eq!(
chunks.next(),
Some(HashedPostState {
accounts: B256Map::from_iter([(addr1, Some(Default::default()))]),
storages: B256Map::from_iter([(addr2, HashedStorage::new(true)),])
})
);
assert_eq!(
chunks.next(),
Some(HashedPostState {
accounts: B256Map::default(),
storages: B256Map::from_iter([(
addr2,
HashedStorage {
wiped: false,
storage: B256Map::from_iter([(slot1, U256::ZERO), (slot2, U256::from(1))]),
},
)])
})
);
assert_eq!(
chunks.next(),
Some(HashedPostState {
accounts: B256Map::from_iter([(addr2, Some(Default::default()))]),
storages: B256Map::default()
})
);
assert_eq!(chunks.next(), None);
}
}

View File

@@ -82,6 +82,83 @@ impl MultiProofTargets {
self.entry(*hashed_address).or_default().extend(hashed_slots);
}
}
/// Returns an iterator that yields chunks of the specified size.
///
/// See [`ChunkedMultiProofTargets`] for more information.
pub fn chunks(self, size: usize) -> ChunkedMultiProofTargets {
ChunkedMultiProofTargets::new(self, size)
}
}
/// An iterator that yields chunks of the proof targets of at most `size` account and storage
/// targets.
///
/// For example, for the following proof targets:
/// ```text
/// - 0x1: [0x10, 0x20, 0x30]
/// - 0x2: [0x40]
/// - 0x3: []
/// ```
///
/// and `size = 2`, the iterator will yield the following chunks:
/// ```text
/// - { 0x1: [0x10, 0x20] }
/// - { 0x1: [0x30], 0x2: [0x40] }
/// - { 0x3: [] }
/// ```
///
/// It follows two rules:
/// - If account has associated storage slots, each storage slot is counted towards the chunk size.
/// - If account has no associated storage slots, the account is counted towards the chunk size.
#[derive(Debug)]
pub struct ChunkedMultiProofTargets {
flattened_targets: alloc::vec::IntoIter<(B256, Option<B256>)>,
size: usize,
}
impl ChunkedMultiProofTargets {
fn new(targets: MultiProofTargets, size: usize) -> Self {
let flattened_targets = targets
.into_iter()
.flat_map(|(address, slots)| {
if slots.is_empty() {
// If the account has no storage slots, we still need to yield the account
// address with empty storage slots. `None` here means that
// there's no storage slot to fetch.
itertools::Either::Left(core::iter::once((address, None)))
} else {
itertools::Either::Right(
slots.into_iter().map(move |slot| (address, Some(slot))),
)
}
})
.sorted();
Self { flattened_targets, size }
}
}
impl Iterator for ChunkedMultiProofTargets {
type Item = MultiProofTargets;
fn next(&mut self) -> Option<Self::Item> {
let chunk = self.flattened_targets.by_ref().take(self.size).fold(
MultiProofTargets::default(),
|mut acc, (address, slot)| {
let entry = acc.entry(address).or_default();
if let Some(slot) = slot {
entry.insert(slot);
}
acc
},
);
if chunk.is_empty() {
None
} else {
Some(chunk)
}
}
}
/// The state multiproof of target accounts and multiproofs of their storage tries.