From 28409558f9d459a0bb3aaf1edd2df43339a51b20 Mon Sep 17 00:00:00 2001 From: DaniPopes <57450786+DaniPopes@users.noreply.github.com> Date: Mon, 2 Feb 2026 01:58:43 +0100 Subject: [PATCH] perf: add ParallelBridgeBuffered trait to replace par_bridge (#21674) --- crates/engine/tree/Cargo.toml | 2 +- .../src/tree/payload_processor/sparse_trie.rs | 6 ++-- crates/primitives-traits/src/lib.rs | 6 ++++ crates/primitives-traits/src/rayon.rs | 32 +++++++++++++++++++ crates/trie/sparse/Cargo.toml | 1 + crates/trie/sparse/src/state.rs | 10 +++--- testing/ef-tests/Cargo.toml | 2 +- testing/ef-tests/src/cases/blockchain_test.rs | 8 +++-- 8 files changed, 55 insertions(+), 12 deletions(-) create mode 100644 crates/primitives-traits/src/rayon.rs diff --git a/crates/engine/tree/Cargo.toml b/crates/engine/tree/Cargo.toml index 4f2a4540ba..2d46b32252 100644 --- a/crates/engine/tree/Cargo.toml +++ b/crates/engine/tree/Cargo.toml @@ -23,7 +23,7 @@ reth-evm = { workspace = true, features = ["metrics"] } reth-network-p2p.workspace = true reth-payload-builder.workspace = true reth-payload-primitives.workspace = true -reth-primitives-traits.workspace = true +reth-primitives-traits = { workspace = true, features = ["rayon"] } reth-ethereum-primitives.workspace = true reth-provider.workspace = true reth-prune.workspace = true diff --git a/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs b/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs index b80fe2d8ee..0add6df356 100644 --- a/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs +++ b/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs @@ -7,9 +7,9 @@ use crate::tree::{ use alloy_primitives::B256; use alloy_rlp::{Decodable, Encodable}; use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender}; -use rayon::iter::{ParallelBridge, ParallelIterator}; +use rayon::iter::ParallelIterator; use reth_errors::ProviderError; -use reth_primitives_traits::Account; +use reth_primitives_traits::{Account, ParallelBridgeBuffered}; use reth_revm::state::EvmState; use reth_trie::{ proof_v2::Target, updates::TrieUpdates, HashedPostState, Nibbles, TrieAccount, EMPTY_ROOT_HASH, @@ -657,7 +657,7 @@ where .storages .into_iter() .map(|(address, storage)| (address, storage, trie.take_storage_trie(&address))) - .par_bridge() + .par_bridge_buffered() .map(|(address, storage, storage_trie)| { let _enter = debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage trie", ?address) diff --git a/crates/primitives-traits/src/lib.rs b/crates/primitives-traits/src/lib.rs index 18fb6292bd..4467eeb331 100644 --- a/crates/primitives-traits/src/lib.rs +++ b/crates/primitives-traits/src/lib.rs @@ -188,6 +188,12 @@ pub mod serde_bincode_compat; pub mod size; pub use size::InMemorySize; +/// Rayon utilities +#[cfg(feature = "rayon")] +pub mod rayon; +#[cfg(feature = "rayon")] +pub use rayon::ParallelBridgeBuffered; + /// Node traits pub mod node; pub use node::{BlockTy, BodyTy, HeaderTy, NodePrimitives, ReceiptTy, TxTy}; diff --git a/crates/primitives-traits/src/rayon.rs b/crates/primitives-traits/src/rayon.rs new file mode 100644 index 0000000000..77a03a36b3 --- /dev/null +++ b/crates/primitives-traits/src/rayon.rs @@ -0,0 +1,32 @@ +//! Rayon parallel iterator utilities. + +use alloc::vec::Vec; +use rayon::iter::IntoParallelIterator; + +/// Extension trait for iterators to convert them to parallel iterators via collection. +/// +/// This is an alternative to [`rayon::iter::ParallelBridge`] that first collects the iterator +/// into a `Vec`, then calls [`IntoParallelIterator`] on it. This avoids the mutex contention +/// that can occur with `par_bridge` when either the iterator's `next()` method is fast or the +/// parallel tasks are fast, as `par_bridge` wraps the iterator in a mutex. +/// +/// # When to use +/// +/// Use `par_bridge_buffered` instead of `par_bridge` when: +/// - The iterator produces items quickly +/// - The parallel work per item is relatively light +/// - The total number of items is known to be reasonable for memory +/// +/// Stick with `par_bridge` when: +/// - The iterator is slow (e.g., I/O bound) and you want to overlap iteration with processing +/// - Memory is constrained and you cannot afford to collect all items upfront +pub trait ParallelBridgeBuffered: Iterator + Sized { + /// Collects this iterator into a `Vec` and returns a parallel iterator over it. + /// + /// See [this trait's documentation](ParallelBridgeBuffered) for more details. + fn par_bridge_buffered(self) -> rayon::vec::IntoIter { + self.collect::>().into_par_iter() + } +} + +impl> ParallelBridgeBuffered for I {} diff --git a/crates/trie/sparse/Cargo.toml b/crates/trie/sparse/Cargo.toml index 0955f204f9..8c0b6daa16 100644 --- a/crates/trie/sparse/Cargo.toml +++ b/crates/trie/sparse/Cargo.toml @@ -60,6 +60,7 @@ std = [ "alloy-trie/std", "reth-execution-errors/std", "reth-primitives-traits/std", + "reth-primitives-traits/rayon", "reth-storage-api/std", "reth-trie-common/std", "tracing/std", diff --git a/crates/trie/sparse/src/state.rs b/crates/trie/sparse/src/state.rs index fa3bc25027..29ae813b4e 100644 --- a/crates/trie/sparse/src/state.rs +++ b/crates/trie/sparse/src/state.rs @@ -286,7 +286,8 @@ where #[cfg(feature = "std")] // If std then reveal storage proofs in parallel { - use rayon::iter::{ParallelBridge, ParallelIterator}; + use rayon::iter::ParallelIterator; + use reth_primitives_traits::ParallelBridgeBuffered; let retain_updates = self.retain_updates; @@ -300,7 +301,7 @@ where let trie = self.storage.take_or_create_trie(&account); (account, storage_subtree, revealed_nodes, trie) }) - .par_bridge() + .par_bridge_buffered() .map(|(account, storage_subtree, mut revealed_nodes, mut trie)| { let result = Self::reveal_decoded_storage_multiproof_inner( account, @@ -372,7 +373,8 @@ where #[cfg(feature = "std")] // If std then reveal storage proofs in parallel { - use rayon::iter::{ParallelBridge, ParallelIterator}; + use rayon::iter::ParallelIterator; + use reth_primitives_traits::ParallelBridgeBuffered; let retain_updates = self.retain_updates; @@ -387,7 +389,7 @@ where let trie = self.storage.take_or_create_trie(&account); (account, storage_proofs, revealed_nodes, trie) }) - .par_bridge() + .par_bridge_buffered() .map(|(account, storage_proofs, mut revealed_nodes, mut trie)| { let result = Self::reveal_storage_v2_proof_nodes_inner( account, diff --git a/testing/ef-tests/Cargo.toml b/testing/ef-tests/Cargo.toml index e9cf465a98..d27d5bfb03 100644 --- a/testing/ef-tests/Cargo.toml +++ b/testing/ef-tests/Cargo.toml @@ -18,7 +18,7 @@ asm-keccak = ["alloy-primitives/asm-keccak", "revm/asm-keccak"] [dependencies] reth-chainspec.workspace = true reth-ethereum-primitives.workspace = true -reth-primitives-traits.workspace = true +reth-primitives-traits = { workspace = true, features = ["rayon"] } reth-consensus.workspace = true reth-db = { workspace = true, features = ["mdbx", "test-utils", "disable-lock"] } reth-db-api.workspace = true diff --git a/testing/ef-tests/src/cases/blockchain_test.rs b/testing/ef-tests/src/cases/blockchain_test.rs index 8f26ac8dd0..1d7d040aed 100644 --- a/testing/ef-tests/src/cases/blockchain_test.rs +++ b/testing/ef-tests/src/cases/blockchain_test.rs @@ -5,7 +5,7 @@ use crate::{ Case, Error, Suite, }; use alloy_rlp::{Decodable, Encodable}; -use rayon::iter::{ParallelBridge, ParallelIterator}; +use rayon::iter::ParallelIterator; use reth_chainspec::ChainSpec; use reth_consensus::{Consensus, HeaderValidator}; use reth_db_common::init::{insert_genesis_hashes, insert_genesis_history, insert_genesis_state}; @@ -13,7 +13,9 @@ use reth_ethereum_consensus::{validate_block_post_execution, EthBeaconConsensus} use reth_ethereum_primitives::{Block, TransactionSigned}; use reth_evm::{execute::Executor, ConfigureEvm}; use reth_evm_ethereum::EthEvmConfig; -use reth_primitives_traits::{Block as BlockTrait, RecoveredBlock, SealedBlock}; +use reth_primitives_traits::{ + Block as BlockTrait, ParallelBridgeBuffered, RecoveredBlock, SealedBlock, +}; use reth_provider::{ test_utils::create_test_provider_factory_with_chain_spec, BlockWriter, DatabaseProviderFactory, ExecutionOutcome, HeaderProvider, HistoryWriter, OriginalValuesKnown, StateProofProvider, @@ -180,7 +182,7 @@ impl Case for BlockchainTestCase { self.tests .iter() .filter(|(_, case)| !Self::excluded_fork(case.network)) - .par_bridge() + .par_bridge_buffered() .try_for_each(|(name, case)| Self::run_single_case(name, case).map(|_| ()))?; Ok(())