perf: add ParallelBridgeBuffered trait to replace par_bridge (#21674)

This commit is contained in:
DaniPopes
2026-02-02 01:58:43 +01:00
committed by GitHub
parent 5ef32726db
commit 28409558f9
8 changed files with 55 additions and 12 deletions

View File

@@ -23,7 +23,7 @@ reth-evm = { workspace = true, features = ["metrics"] }
reth-network-p2p.workspace = true
reth-payload-builder.workspace = true
reth-payload-primitives.workspace = true
reth-primitives-traits.workspace = true
reth-primitives-traits = { workspace = true, features = ["rayon"] }
reth-ethereum-primitives.workspace = true
reth-provider.workspace = true
reth-prune.workspace = true

View File

@@ -7,9 +7,9 @@ use crate::tree::{
use alloy_primitives::B256;
use alloy_rlp::{Decodable, Encodable};
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use rayon::iter::{ParallelBridge, ParallelIterator};
use rayon::iter::ParallelIterator;
use reth_errors::ProviderError;
use reth_primitives_traits::Account;
use reth_primitives_traits::{Account, ParallelBridgeBuffered};
use reth_revm::state::EvmState;
use reth_trie::{
proof_v2::Target, updates::TrieUpdates, HashedPostState, Nibbles, TrieAccount, EMPTY_ROOT_HASH,
@@ -657,7 +657,7 @@ where
.storages
.into_iter()
.map(|(address, storage)| (address, storage, trie.take_storage_trie(&address)))
.par_bridge()
.par_bridge_buffered()
.map(|(address, storage, storage_trie)| {
let _enter =
debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage trie", ?address)

View File

@@ -188,6 +188,12 @@ pub mod serde_bincode_compat;
pub mod size;
pub use size::InMemorySize;
/// Rayon utilities
#[cfg(feature = "rayon")]
pub mod rayon;
#[cfg(feature = "rayon")]
pub use rayon::ParallelBridgeBuffered;
/// Node traits
pub mod node;
pub use node::{BlockTy, BodyTy, HeaderTy, NodePrimitives, ReceiptTy, TxTy};

View File

@@ -0,0 +1,32 @@
//! Rayon parallel iterator utilities.
use alloc::vec::Vec;
use rayon::iter::IntoParallelIterator;
/// Extension trait for iterators to convert them to parallel iterators via collection.
///
/// This is an alternative to [`rayon::iter::ParallelBridge`] that first collects the iterator
/// into a `Vec`, then calls [`IntoParallelIterator`] on it. This avoids the mutex contention
/// that can occur with `par_bridge` when either the iterator's `next()` method is fast or the
/// parallel tasks are fast, as `par_bridge` wraps the iterator in a mutex.
///
/// # When to use
///
/// Use `par_bridge_buffered` instead of `par_bridge` when:
/// - The iterator produces items quickly
/// - The parallel work per item is relatively light
/// - The total number of items is known to be reasonable for memory
///
/// Stick with `par_bridge` when:
/// - The iterator is slow (e.g., I/O bound) and you want to overlap iteration with processing
/// - Memory is constrained and you cannot afford to collect all items upfront
pub trait ParallelBridgeBuffered: Iterator<Item: Send> + Sized {
/// Collects this iterator into a `Vec` and returns a parallel iterator over it.
///
/// See [this trait's documentation](ParallelBridgeBuffered) for more details.
fn par_bridge_buffered(self) -> rayon::vec::IntoIter<Self::Item> {
self.collect::<Vec<_>>().into_par_iter()
}
}
impl<I: Iterator<Item: Send>> ParallelBridgeBuffered for I {}

View File

@@ -60,6 +60,7 @@ std = [
"alloy-trie/std",
"reth-execution-errors/std",
"reth-primitives-traits/std",
"reth-primitives-traits/rayon",
"reth-storage-api/std",
"reth-trie-common/std",
"tracing/std",

View File

@@ -286,7 +286,8 @@ where
#[cfg(feature = "std")]
// If std then reveal storage proofs in parallel
{
use rayon::iter::{ParallelBridge, ParallelIterator};
use rayon::iter::ParallelIterator;
use reth_primitives_traits::ParallelBridgeBuffered;
let retain_updates = self.retain_updates;
@@ -300,7 +301,7 @@ where
let trie = self.storage.take_or_create_trie(&account);
(account, storage_subtree, revealed_nodes, trie)
})
.par_bridge()
.par_bridge_buffered()
.map(|(account, storage_subtree, mut revealed_nodes, mut trie)| {
let result = Self::reveal_decoded_storage_multiproof_inner(
account,
@@ -372,7 +373,8 @@ where
#[cfg(feature = "std")]
// If std then reveal storage proofs in parallel
{
use rayon::iter::{ParallelBridge, ParallelIterator};
use rayon::iter::ParallelIterator;
use reth_primitives_traits::ParallelBridgeBuffered;
let retain_updates = self.retain_updates;
@@ -387,7 +389,7 @@ where
let trie = self.storage.take_or_create_trie(&account);
(account, storage_proofs, revealed_nodes, trie)
})
.par_bridge()
.par_bridge_buffered()
.map(|(account, storage_proofs, mut revealed_nodes, mut trie)| {
let result = Self::reveal_storage_v2_proof_nodes_inner(
account,

View File

@@ -18,7 +18,7 @@ asm-keccak = ["alloy-primitives/asm-keccak", "revm/asm-keccak"]
[dependencies]
reth-chainspec.workspace = true
reth-ethereum-primitives.workspace = true
reth-primitives-traits.workspace = true
reth-primitives-traits = { workspace = true, features = ["rayon"] }
reth-consensus.workspace = true
reth-db = { workspace = true, features = ["mdbx", "test-utils", "disable-lock"] }
reth-db-api.workspace = true

View File

@@ -5,7 +5,7 @@ use crate::{
Case, Error, Suite,
};
use alloy_rlp::{Decodable, Encodable};
use rayon::iter::{ParallelBridge, ParallelIterator};
use rayon::iter::ParallelIterator;
use reth_chainspec::ChainSpec;
use reth_consensus::{Consensus, HeaderValidator};
use reth_db_common::init::{insert_genesis_hashes, insert_genesis_history, insert_genesis_state};
@@ -13,7 +13,9 @@ use reth_ethereum_consensus::{validate_block_post_execution, EthBeaconConsensus}
use reth_ethereum_primitives::{Block, TransactionSigned};
use reth_evm::{execute::Executor, ConfigureEvm};
use reth_evm_ethereum::EthEvmConfig;
use reth_primitives_traits::{Block as BlockTrait, RecoveredBlock, SealedBlock};
use reth_primitives_traits::{
Block as BlockTrait, ParallelBridgeBuffered, RecoveredBlock, SealedBlock,
};
use reth_provider::{
test_utils::create_test_provider_factory_with_chain_spec, BlockWriter, DatabaseProviderFactory,
ExecutionOutcome, HeaderProvider, HistoryWriter, OriginalValuesKnown, StateProofProvider,
@@ -180,7 +182,7 @@ impl Case for BlockchainTestCase {
self.tests
.iter()
.filter(|(_, case)| !Self::excluded_fork(case.network))
.par_bridge()
.par_bridge_buffered()
.try_for_each(|(name, case)| Self::run_single_case(name, case).map(|_| ()))?;
Ok(())