mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-08 15:03:58 -05:00
perf(engine): parellelize multiproof_targets_from_state (#20669)
This commit is contained in:
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -458,6 +458,7 @@ dependencies = [
|
||||
"proptest-derive 0.6.0",
|
||||
"rand 0.9.2",
|
||||
"rapidhash",
|
||||
"rayon",
|
||||
"ruint",
|
||||
"rustc-hash",
|
||||
"serde",
|
||||
@@ -4564,6 +4565,7 @@ dependencies = [
|
||||
"allocator-api2",
|
||||
"equivalent",
|
||||
"foldhash 0.2.0",
|
||||
"rayon",
|
||||
"serde",
|
||||
"serde_core",
|
||||
]
|
||||
@@ -5067,6 +5069,7 @@ dependencies = [
|
||||
"arbitrary",
|
||||
"equivalent",
|
||||
"hashbrown 0.16.1",
|
||||
"rayon",
|
||||
"serde",
|
||||
"serde_core",
|
||||
]
|
||||
|
||||
@@ -29,6 +29,7 @@ use alloy_evm::Database;
|
||||
use alloy_primitives::{keccak256, map::B256Set, B256};
|
||||
use crossbeam_channel::Sender as CrossbeamSender;
|
||||
use metrics::{Counter, Gauge, Histogram};
|
||||
use rayon::iter::{IntoParallelIterator, ParallelIterator};
|
||||
use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, SpecFor};
|
||||
use reth_execution_types::ExecutionOutcome;
|
||||
use reth_metrics::Metrics;
|
||||
@@ -618,7 +619,8 @@ where
|
||||
let _enter =
|
||||
debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm outcome", index, tx_hash=%tx.tx().tx_hash())
|
||||
.entered();
|
||||
let (targets, storage_targets) = multiproof_targets_from_state(res.state);
|
||||
let targets = multiproof_targets_from_state(res.state);
|
||||
let storage_targets = targets.storage_targets_count();
|
||||
metrics.prefetch_storage_targets.record(storage_targets as f64);
|
||||
let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: Some(targets) });
|
||||
drop(_enter);
|
||||
@@ -765,37 +767,33 @@ where
|
||||
|
||||
/// Returns a set of [`MultiProofTargets`] and the total amount of storage targets, based on the
|
||||
/// given state.
|
||||
fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargets, usize) {
|
||||
let mut targets = MultiProofTargets::with_capacity(state.len());
|
||||
let mut storage_targets = 0;
|
||||
for (addr, account) in state {
|
||||
// if the account was not touched, or if the account was selfdestructed, do not
|
||||
// fetch proofs for it
|
||||
//
|
||||
// Since selfdestruct can only happen in the same transaction, we can skip
|
||||
// prefetching proofs for selfdestructed accounts
|
||||
//
|
||||
// See: https://eips.ethereum.org/EIPS/eip-6780
|
||||
if !account.is_touched() || account.is_selfdestructed() {
|
||||
continue
|
||||
}
|
||||
|
||||
let mut storage_set =
|
||||
B256Set::with_capacity_and_hasher(account.storage.len(), Default::default());
|
||||
for (key, slot) in account.storage {
|
||||
// do nothing if unchanged
|
||||
if !slot.is_changed() {
|
||||
continue
|
||||
fn multiproof_targets_from_state(state: EvmState) -> MultiProofTargets {
|
||||
state
|
||||
.into_par_iter()
|
||||
.filter_map(|(address, account)| {
|
||||
// if the account was not touched, or if the account was selfdestructed, do not
|
||||
// fetch proofs for it
|
||||
//
|
||||
// Since selfdestruct can only happen in the same transaction, we can skip
|
||||
// prefetching proofs for selfdestructed accounts
|
||||
//
|
||||
// See: https://eips.ethereum.org/EIPS/eip-6780
|
||||
if !account.is_touched() || account.is_selfdestructed() {
|
||||
return None;
|
||||
}
|
||||
|
||||
storage_set.insert(keccak256(B256::new(key.to_be_bytes())));
|
||||
}
|
||||
let hashed_address = keccak256(address);
|
||||
|
||||
storage_targets += storage_set.len();
|
||||
targets.insert(keccak256(addr), storage_set);
|
||||
}
|
||||
let storage_set: B256Set = account
|
||||
.storage
|
||||
.into_iter()
|
||||
.filter(|(_, slot)| slot.is_changed())
|
||||
.map(|(key, _)| keccak256(B256::new(key.to_be_bytes())))
|
||||
.collect();
|
||||
|
||||
(targets, storage_targets)
|
||||
Some((hashed_address, storage_set))
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// The events the pre-warm task can handle.
|
||||
|
||||
@@ -131,7 +131,7 @@ arbitrary = [
|
||||
"reth-codecs/arbitrary",
|
||||
"alloy-rpc-types-eth?/arbitrary",
|
||||
]
|
||||
rayon = ["dep:rayon"]
|
||||
rayon = ["dep:rayon", "alloy-primitives/rayon"]
|
||||
|
||||
[[bench]]
|
||||
name = "prefix_set"
|
||||
|
||||
@@ -16,6 +16,8 @@ use alloy_trie::{
|
||||
};
|
||||
use derive_more::{Deref, DerefMut, IntoIterator};
|
||||
use itertools::Itertools;
|
||||
#[cfg(feature = "rayon")]
|
||||
use rayon::iter::{FromParallelIterator, IntoParallelIterator, ParallelIterator};
|
||||
use reth_primitives_traits::Account;
|
||||
|
||||
/// Proof targets map.
|
||||
@@ -28,6 +30,16 @@ impl FromIterator<(B256, B256Set)> for MultiProofTargets {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "rayon")]
|
||||
impl FromParallelIterator<(B256, B256Set)> for MultiProofTargets {
|
||||
fn from_par_iter<I>(par_iter: I) -> Self
|
||||
where
|
||||
I: IntoParallelIterator<Item = (B256, B256Set)>,
|
||||
{
|
||||
Self(par_iter.into_par_iter().collect())
|
||||
}
|
||||
}
|
||||
|
||||
impl MultiProofTargets {
|
||||
/// Creates an empty `MultiProofTargets` with at least the specified capacity.
|
||||
pub fn with_capacity(capacity: usize) -> Self {
|
||||
@@ -94,6 +106,11 @@ impl MultiProofTargets {
|
||||
pub fn chunking_length(&self) -> usize {
|
||||
self.values().map(|slots| 1 + slots.len().saturating_sub(1)).sum::<usize>()
|
||||
}
|
||||
|
||||
/// Returns the total count of storage slot targets across all accounts.
|
||||
pub fn storage_targets_count(&self) -> usize {
|
||||
self.values().map(|slots| slots.len()).sum()
|
||||
}
|
||||
}
|
||||
|
||||
/// An iterator that yields chunks of the proof targets of at most `size` account and storage
|
||||
|
||||
Reference in New Issue
Block a user