diff --git a/Cargo.lock b/Cargo.lock index adf941b50e..2d2e43b88d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -458,6 +458,7 @@ dependencies = [ "proptest-derive 0.6.0", "rand 0.9.2", "rapidhash", + "rayon", "ruint", "rustc-hash", "serde", @@ -4564,6 +4565,7 @@ dependencies = [ "allocator-api2", "equivalent", "foldhash 0.2.0", + "rayon", "serde", "serde_core", ] @@ -5067,6 +5069,7 @@ dependencies = [ "arbitrary", "equivalent", "hashbrown 0.16.1", + "rayon", "serde", "serde_core", ] diff --git a/crates/engine/tree/src/tree/payload_processor/prewarm.rs b/crates/engine/tree/src/tree/payload_processor/prewarm.rs index 835cffe9e3..f168937bec 100644 --- a/crates/engine/tree/src/tree/payload_processor/prewarm.rs +++ b/crates/engine/tree/src/tree/payload_processor/prewarm.rs @@ -29,6 +29,7 @@ use alloy_evm::Database; use alloy_primitives::{keccak256, map::B256Set, B256}; use crossbeam_channel::Sender as CrossbeamSender; use metrics::{Counter, Gauge, Histogram}; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, SpecFor}; use reth_execution_types::ExecutionOutcome; use reth_metrics::Metrics; @@ -618,7 +619,8 @@ where let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm outcome", index, tx_hash=%tx.tx().tx_hash()) .entered(); - let (targets, storage_targets) = multiproof_targets_from_state(res.state); + let targets = multiproof_targets_from_state(res.state); + let storage_targets = targets.storage_targets_count(); metrics.prefetch_storage_targets.record(storage_targets as f64); let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: Some(targets) }); drop(_enter); @@ -765,37 +767,33 @@ where /// Returns a set of [`MultiProofTargets`] and the total amount of storage targets, based on the /// given state. -fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargets, usize) { - let mut targets = MultiProofTargets::with_capacity(state.len()); - let mut storage_targets = 0; - for (addr, account) in state { - // if the account was not touched, or if the account was selfdestructed, do not - // fetch proofs for it - // - // Since selfdestruct can only happen in the same transaction, we can skip - // prefetching proofs for selfdestructed accounts - // - // See: https://eips.ethereum.org/EIPS/eip-6780 - if !account.is_touched() || account.is_selfdestructed() { - continue - } - - let mut storage_set = - B256Set::with_capacity_and_hasher(account.storage.len(), Default::default()); - for (key, slot) in account.storage { - // do nothing if unchanged - if !slot.is_changed() { - continue +fn multiproof_targets_from_state(state: EvmState) -> MultiProofTargets { + state + .into_par_iter() + .filter_map(|(address, account)| { + // if the account was not touched, or if the account was selfdestructed, do not + // fetch proofs for it + // + // Since selfdestruct can only happen in the same transaction, we can skip + // prefetching proofs for selfdestructed accounts + // + // See: https://eips.ethereum.org/EIPS/eip-6780 + if !account.is_touched() || account.is_selfdestructed() { + return None; } - storage_set.insert(keccak256(B256::new(key.to_be_bytes()))); - } + let hashed_address = keccak256(address); - storage_targets += storage_set.len(); - targets.insert(keccak256(addr), storage_set); - } + let storage_set: B256Set = account + .storage + .into_iter() + .filter(|(_, slot)| slot.is_changed()) + .map(|(key, _)| keccak256(B256::new(key.to_be_bytes()))) + .collect(); - (targets, storage_targets) + Some((hashed_address, storage_set)) + }) + .collect() } /// The events the pre-warm task can handle. diff --git a/crates/trie/common/Cargo.toml b/crates/trie/common/Cargo.toml index 55dadbab1d..d4c11df3f5 100644 --- a/crates/trie/common/Cargo.toml +++ b/crates/trie/common/Cargo.toml @@ -131,7 +131,7 @@ arbitrary = [ "reth-codecs/arbitrary", "alloy-rpc-types-eth?/arbitrary", ] -rayon = ["dep:rayon"] +rayon = ["dep:rayon", "alloy-primitives/rayon"] [[bench]] name = "prefix_set" diff --git a/crates/trie/common/src/proofs.rs b/crates/trie/common/src/proofs.rs index 4aa4a6200b..7fd6f32bef 100644 --- a/crates/trie/common/src/proofs.rs +++ b/crates/trie/common/src/proofs.rs @@ -16,6 +16,8 @@ use alloy_trie::{ }; use derive_more::{Deref, DerefMut, IntoIterator}; use itertools::Itertools; +#[cfg(feature = "rayon")] +use rayon::iter::{FromParallelIterator, IntoParallelIterator, ParallelIterator}; use reth_primitives_traits::Account; /// Proof targets map. @@ -28,6 +30,16 @@ impl FromIterator<(B256, B256Set)> for MultiProofTargets { } } +#[cfg(feature = "rayon")] +impl FromParallelIterator<(B256, B256Set)> for MultiProofTargets { + fn from_par_iter(par_iter: I) -> Self + where + I: IntoParallelIterator, + { + Self(par_iter.into_par_iter().collect()) + } +} + impl MultiProofTargets { /// Creates an empty `MultiProofTargets` with at least the specified capacity. pub fn with_capacity(capacity: usize) -> Self { @@ -94,6 +106,11 @@ impl MultiProofTargets { pub fn chunking_length(&self) -> usize { self.values().map(|slots| 1 + slots.len().saturating_sub(1)).sum::() } + + /// Returns the total count of storage slot targets across all accounts. + pub fn storage_targets_count(&self) -> usize { + self.values().map(|slots| slots.len()).sum() + } } /// An iterator that yields chunks of the proof targets of at most `size` account and storage