perf: use indexed parallel iterators for tx recovery (#20342)

This commit is contained in:
DaniPopes
2025-12-15 10:40:03 -03:00
committed by GitHub
parent 42c1e1afe1
commit 21216e2f24
16 changed files with 100 additions and 83 deletions

1
Cargo.lock generated
View File

@@ -8639,6 +8639,7 @@ dependencies = [
"derive_more",
"futures-util",
"metrics",
"rayon",
"reth-ethereum-forks",
"reth-ethereum-primitives",
"reth-execution-errors",

View File

@@ -22,7 +22,8 @@ use reth_trie_common::HashedPostState;
use serde::{de::DeserializeOwned, Serialize};
// Re-export [`ExecutionPayload`] moved to `reth_payload_primitives`
pub use reth_evm::{ConfigureEngineEvm, ExecutableTxIterator};
#[cfg(feature = "std")]
pub use reth_evm::{ConfigureEngineEvm, ExecutableTxIterator, ExecutableTxTuple};
pub use reth_payload_primitives::ExecutionPayload;
mod error;

View File

@@ -16,7 +16,7 @@ reth-chain-state.workspace = true
reth-chainspec = { workspace = true, optional = true }
reth-consensus.workspace = true
reth-db.workspace = true
reth-engine-primitives.workspace = true
reth-engine-primitives = { workspace = true, features = ["std"] }
reth-errors.workspace = true
reth-execution-types.workspace = true
reth-evm = { workspace = true, features = ["metrics"] }

View File

@@ -230,12 +230,12 @@ fn bench_state_root(c: &mut Criterion) {
let mut handle = payload_processor.spawn(
Default::default(),
(
core::iter::empty::<
Vec::<
Result<
Recovered<TransactionSigned>,
core::convert::Infallible,
>,
>(),
>::new(),
std::convert::identity,
),
StateProviderBuilder::new(provider.clone(), genesis_hash, None),

View File

@@ -21,7 +21,7 @@ use executor::WorkloadExecutor;
use multiproof::{SparseTrieUpdate, *};
use parking_lot::RwLock;
use prewarm::PrewarmMetrics;
use rayon::iter::{ParallelBridge, ParallelIterator};
use rayon::prelude::*;
use reth_engine_primitives::ExecutableTxIterator;
use reth_evm::{
execute::{ExecutableTxFor, WithTxEnv},
@@ -318,36 +318,32 @@ where
usize,
) {
let (transactions, convert) = transactions.into();
let transactions = transactions.into_iter();
// Get the transaction count for prewarming task
// Use upper bound if available (more accurate), otherwise use lower bound
let (lower, upper) = transactions.size_hint();
let transaction_count_hint = upper.unwrap_or(lower);
let transactions = transactions.into_par_iter();
let transaction_count_hint = transactions.len();
// Spawn a task that iterates through all transactions in parallel and sends them to the
// main task.
let (tx, rx) = mpsc::channel();
let (ooo_tx, ooo_rx) = mpsc::channel();
let (prewarm_tx, prewarm_rx) = mpsc::channel();
let (execute_tx, execute_rx) = mpsc::channel();
// Spawn a task that `convert`s all transactions in parallel and sends them out-of-order.
self.executor.spawn_blocking(move || {
transactions.enumerate().par_bridge().for_each_with(tx, |sender, (idx, tx)| {
transactions.enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| {
let tx = convert(tx);
let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx: Arc::new(tx) });
let _ = sender.send((idx, tx));
// Only send Ok(_) variants to prewarming task.
if let Ok(tx) = &tx {
let _ = prewarm_tx.send(tx.clone());
}
let _ = ooo_tx.send((idx, tx));
});
});
// Spawn a task that processes out-of-order transactions from the task above and sends them
// to prewarming and execution tasks.
let (prewarm_tx, prewarm_rx) = mpsc::channel();
let (execute_tx, execute_rx) = mpsc::channel();
// to the execution task in order.
self.executor.spawn_blocking(move || {
let mut next_for_execution = 0;
let mut queue = BTreeMap::new();
while let Ok((idx, tx)) = rx.recv() {
// only send Ok(_) variants to prewarming task
if let Ok(tx) = &tx {
let _ = prewarm_tx.send(tx.clone());
}
while let Ok((idx, tx)) = ooo_rx.recv() {
if next_for_execution == idx {
let _ = execute_tx.send(tx);
next_for_execution += 1;
@@ -1057,19 +1053,16 @@ mod tests {
let provider_factory = BlockchainProvider::new(factory).unwrap();
let mut handle =
payload_processor.spawn(
Default::default(),
(
core::iter::empty::<
Result<Recovered<TransactionSigned>, core::convert::Infallible>,
>(),
std::convert::identity,
),
StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
OverlayStateProviderFactory::new(provider_factory),
&TreeConfig::default(),
);
let mut handle = payload_processor.spawn(
Default::default(),
(
Vec::<Result<Recovered<TransactionSigned>, core::convert::Infallible>>::new(),
std::convert::identity,
),
StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
OverlayStateProviderFactory::new(provider_factory),
&TreeConfig::default(),
);
let mut state_hook = handle.state_hook();

View File

@@ -15,6 +15,7 @@ use alloy_eip7928::BlockAccessList;
use alloy_eips::{eip1898::BlockWithParent, NumHash};
use alloy_evm::Evm;
use alloy_primitives::B256;
use rayon::prelude::*;
use reth_chain_state::{CanonicalInMemoryState, DeferredTrieData, ExecutedBlock};
use reth_consensus::{ConsensusError, FullConsensus};
use reth_engine_primitives::{
@@ -221,7 +222,7 @@ where
.map_err(NewPayloadError::other)?
.into();
let iter = Either::Left(iter.into_iter().map(Either::Left));
let iter = Either::Left(iter.into_par_iter().map(Either::Left));
let convert = move |tx| {
let Either::Left(tx) = tx else { unreachable!() };
convert(tx).map(Either::Left).map_err(Either::Left)
@@ -231,8 +232,9 @@ where
Ok((iter, Box::new(convert) as Box<dyn Fn(_) -> _ + Send + Sync + 'static>))
}
BlockOrPayload::Block(block) => {
let iter =
Either::Right(block.body().clone_transactions().into_iter().map(Either::Right));
let iter = Either::Right(
block.body().clone_transactions().into_par_iter().map(Either::Right),
);
let convert = move |tx: Either<_, N::SignedTx>| {
let Either::Right(tx) = tx else { unreachable!() };
tx.try_into_recovered().map(Either::Right).map_err(Either::Right)

View File

@@ -16,7 +16,7 @@ reth-primitives-traits.workspace = true
reth-errors.workspace = true
reth-chainspec.workspace = true
reth-fs-util.workspace = true
reth-engine-primitives.workspace = true
reth-engine-primitives = { workspace = true, features = ["std"] }
reth-engine-tree.workspace = true
reth-evm.workspace = true
reth-revm.workspace = true

View File

@@ -19,32 +19,37 @@ extern crate alloc;
use alloc::{borrow::Cow, sync::Arc};
use alloy_consensus::Header;
use alloy_eips::Decodable2718;
pub use alloy_evm::EthEvm;
use alloy_evm::{
eth::{EthBlockExecutionCtx, EthBlockExecutorFactory},
EthEvmFactory, FromRecoveredTx, FromTxWithEncoded,
};
use alloy_primitives::{Bytes, U256};
use alloy_rpc_types_engine::ExecutionData;
use core::{convert::Infallible, fmt::Debug};
use reth_chainspec::{ChainSpec, EthChainSpec, EthereumHardforks, MAINNET};
use reth_chainspec::{ChainSpec, EthChainSpec, MAINNET};
use reth_ethereum_primitives::{Block, EthPrimitives, TransactionSigned};
use reth_evm::{
eth::NextEvmEnvAttributes, precompiles::PrecompilesMap, ConfigureEngineEvm, ConfigureEvm,
EvmEnv, EvmEnvFor, EvmFactory, ExecutableTxIterator, ExecutionCtxFor, NextBlockEnvAttributes,
TransactionEnv,
eth::NextEvmEnvAttributes, precompiles::PrecompilesMap, ConfigureEvm, EvmEnv, EvmFactory,
NextBlockEnvAttributes, TransactionEnv,
};
use reth_primitives_traits::{
constants::MAX_TX_GAS_LIMIT_OSAKA, SealedBlock, SealedHeader, SignedTransaction, TxTy,
};
use reth_storage_errors::any::AnyError;
use revm::{
context::{BlockEnv, CfgEnv},
context_interface::block::BlobExcessGasAndPrice,
primitives::hardfork::SpecId,
use reth_primitives_traits::{SealedBlock, SealedHeader};
use revm::{context::BlockEnv, primitives::hardfork::SpecId};
#[cfg(feature = "std")]
use reth_evm::{ConfigureEngineEvm, ExecutableTxIterator};
#[allow(unused_imports)]
use {
alloy_eips::Decodable2718,
alloy_primitives::{Bytes, U256},
alloy_rpc_types_engine::ExecutionData,
reth_chainspec::EthereumHardforks,
reth_evm::{EvmEnvFor, ExecutionCtxFor},
reth_primitives_traits::{constants::MAX_TX_GAS_LIMIT_OSAKA, SignedTransaction, TxTy},
reth_storage_errors::any::AnyError,
revm::context::CfgEnv,
revm::context_interface::block::BlobExcessGasAndPrice,
};
pub use alloy_evm::EthEvm;
mod config;
use alloy_evm::eth::spec::EthExecutorSpec;
pub use config::{revm_spec, revm_spec_by_timestamp_and_block_number};
@@ -206,6 +211,7 @@ where
}
}
#[cfg(feature = "std")]
impl<ChainSpec, EvmF> ConfigureEngineEvm<ExecutionData> for EthEvmConfig<ChainSpec, EvmF>
where
ChainSpec: EthExecutorSpec + EthChainSpec<Header = Header> + Hardforks + 'static,
@@ -286,7 +292,7 @@ where
&self,
payload: &ExecutionData,
) -> Result<impl ExecutableTxIterator<Self>, Self::Error> {
let txs = payload.payload.transactions().clone().into_iter();
let txs = payload.payload.transactions().clone();
let convert = |tx: Bytes| {
let tx =
TxTy::<Self::Primitives>::decode_2718_exact(tx.as_ref()).map_err(AnyError::new)?;

View File

@@ -24,7 +24,7 @@ reth-provider.workspace = true
reth-transaction-pool.workspace = true
reth-network.workspace = true
reth-evm.workspace = true
reth-evm-ethereum.workspace = true
reth-evm-ethereum = { workspace = true, features = ["std"] }
reth-rpc.workspace = true
reth-rpc-api.workspace = true
reth-rpc-eth-api.workspace = true
@@ -35,7 +35,7 @@ reth-chainspec.workspace = true
reth-revm = { workspace = true, features = ["std"] }
reth-rpc-eth-types.workspace = true
reth-engine-local.workspace = true
reth-engine-primitives.workspace = true
reth-engine-primitives = { workspace = true, features = ["std"] }
reth-payload-primitives.workspace = true
# ethereum

View File

@@ -24,7 +24,7 @@ reth-payload-builder-primitives.workspace = true
reth-payload-primitives.workspace = true
reth-basic-payload-builder.workspace = true
reth-evm.workspace = true
reth-evm-ethereum.workspace = true
reth-evm-ethereum = { workspace = true, features = ["std"] }
reth-errors.workspace = true
reth-chainspec.workspace = true
reth-payload-validator.workspace = true

View File

@@ -32,6 +32,7 @@ auto_impl.workspace = true
derive_more.workspace = true
futures-util.workspace = true
metrics = { workspace = true, optional = true }
rayon = { workspace = true, optional = true }
[dev-dependencies]
reth-ethereum-primitives.workspace = true
@@ -40,6 +41,7 @@ reth-ethereum-forks.workspace = true
[features]
default = ["std"]
std = [
"dep:rayon",
"reth-primitives-traits/std",
"alloy-eips/std",
"alloy-primitives/std",

View File

@@ -1,4 +1,5 @@
use crate::{execute::ExecutableTxFor, ConfigureEvm, EvmEnvFor, ExecutionCtxFor};
use rayon::prelude::*;
/// [`ConfigureEvm`] extension providing methods for executing payloads.
pub trait ConfigureEngineEvm<ExecutionData>: ConfigureEvm {
@@ -21,7 +22,7 @@ pub trait ConfigureEngineEvm<ExecutionData>: ConfigureEvm {
/// A helper trait representing a pair of a "raw" transactions iterator and a closure that can be
/// used to convert them to an executable transaction. This tuple is used in the engine to
/// parallelize heavy work like decoding or recovery.
pub trait ExecutableTxTuple: Into<(Self::Iter, Self::Convert)> + Send + 'static {
pub trait ExecutableTxTuple: Into<(Self::IntoIter, Self::Convert)> + Send + 'static {
/// Raw transaction that can be converted to an [`ExecutableTxTuple::Tx`]
///
/// This can be any type that can be converted to an [`ExecutableTxTuple::Tx`]. For example,
@@ -32,8 +33,10 @@ pub trait ExecutableTxTuple: Into<(Self::Iter, Self::Convert)> + Send + 'static
/// Errors that may occur while recovering or decoding transactions.
type Error: core::error::Error + Send + Sync + 'static;
/// Iterator over [`ExecutableTxTuple::Tx`]
type Iter: Iterator<Item = Self::RawTx> + Send + 'static;
/// Iterator over [`ExecutableTxTuple::Tx`].
type IntoIter: IntoParallelIterator<Item = Self::RawTx, Iter: IndexedParallelIterator>
+ Send
+ 'static;
/// Closure that can be used to convert a [`ExecutableTxTuple::RawTx`] to a
/// [`ExecutableTxTuple::Tx`]. This might involve heavy work like decoding or recovery
/// and will be parallelized in the engine.
@@ -45,14 +48,14 @@ where
RawTx: Send + Sync + 'static,
Tx: Clone + Send + Sync + 'static,
Err: core::error::Error + Send + Sync + 'static,
I: Iterator<Item = RawTx> + Send + 'static,
I: IntoParallelIterator<Item = RawTx, Iter: IndexedParallelIterator> + Send + 'static,
F: Fn(RawTx) -> Result<Tx, Err> + Send + Sync + 'static,
{
type RawTx = RawTx;
type Tx = Tx;
type Error = Err;
type Iter = I;
type IntoIter = I;
type Convert = F;
}

View File

@@ -44,8 +44,10 @@ pub mod execute;
mod aliases;
pub use aliases::*;
#[cfg(feature = "std")]
mod engine;
pub use engine::{ConfigureEngineEvm, ExecutableTxIterator};
#[cfg(feature = "std")]
pub use engine::{ConfigureEngineEvm, ExecutableTxIterator, ExecutableTxTuple};
#[cfg(feature = "metrics")]
pub mod metrics;

View File

@@ -13,32 +13,38 @@ extern crate alloc;
use alloc::sync::Arc;
use alloy_consensus::{BlockHeader, Header};
use alloy_eips::Decodable2718;
use alloy_evm::{EvmFactory, FromRecoveredTx, FromTxWithEncoded};
use alloy_op_evm::block::{receipt_builder::OpReceiptBuilder, OpTxEnv};
use alloy_primitives::{Bytes, U256};
use core::fmt::Debug;
use op_alloy_consensus::EIP1559ParamError;
use op_alloy_rpc_types_engine::OpExecutionData;
use op_revm::{OpSpecId, OpTransaction};
use reth_chainspec::EthChainSpec;
use reth_evm::{
eth::NextEvmEnvAttributes, precompiles::PrecompilesMap, ConfigureEngineEvm, ConfigureEvm,
EvmEnv, EvmEnvFor, ExecutableTxIterator, ExecutionCtxFor, TransactionEnv,
eth::NextEvmEnvAttributes, precompiles::PrecompilesMap, ConfigureEvm, EvmEnv, TransactionEnv,
};
use reth_optimism_chainspec::OpChainSpec;
use reth_optimism_forks::OpHardforks;
use reth_optimism_primitives::{DepositReceipt, OpPrimitives};
use reth_primitives_traits::{
NodePrimitives, SealedBlock, SealedHeader, SignedTransaction, TxTy, WithEncoded,
};
use reth_storage_errors::any::AnyError;
use revm::{
context::{BlockEnv, CfgEnv, TxEnv},
context_interface::block::BlobExcessGasAndPrice,
primitives::hardfork::SpecId,
use reth_primitives_traits::{NodePrimitives, SealedBlock, SealedHeader, SignedTransaction};
use revm::context::{BlockEnv, TxEnv};
#[allow(unused_imports)]
use {
alloy_eips::Decodable2718,
alloy_primitives::{Bytes, U256},
op_alloy_rpc_types_engine::OpExecutionData,
reth_evm::{EvmEnvFor, ExecutionCtxFor},
reth_primitives_traits::{TxTy, WithEncoded},
reth_storage_errors::any::AnyError,
revm::{
context::CfgEnv, context_interface::block::BlobExcessGasAndPrice,
primitives::hardfork::SpecId,
},
};
#[cfg(feature = "std")]
use reth_evm::{ConfigureEngineEvm, ExecutableTxIterator};
mod config;
pub use config::{revm_spec, revm_spec_by_timestamp_after_bedrock, OpNextBlockEnvAttributes};
mod execute;
@@ -200,6 +206,7 @@ where
}
}
#[cfg(feature = "std")]
impl<ChainSpec, N, R> ConfigureEngineEvm<OpExecutionData> for OpEvmConfig<ChainSpec, N, R>
where
ChainSpec: EthChainSpec<Header = Header> + OpHardforks,
@@ -265,7 +272,7 @@ where
&self,
payload: &OpExecutionData,
) -> Result<impl ExecutableTxIterator<Self>, Self::Error> {
let transactions = payload.payload.transactions().clone().into_iter();
let transactions = payload.payload.transactions().clone();
let convert = |encoded: Bytes| {
let tx = TxTy::<Self::Primitives>::decode_2718_exact(encoded.as_ref())
.map_err(AnyError::new)?;

View File

@@ -34,7 +34,7 @@ reth-rpc-api.workspace = true
# op-reth
reth-optimism-payload-builder.workspace = true
reth-optimism-evm = { workspace = true, features = ["rpc"] }
reth-optimism-evm = { workspace = true, features = ["std", "rpc"] }
reth-optimism-rpc.workspace = true
reth-optimism-storage.workspace = true
reth-optimism-txpool.workspace = true

View File

@@ -127,7 +127,7 @@ impl ConfigureEngineEvm<CustomExecutionData> for CustomEvmConfig {
&self,
payload: &CustomExecutionData,
) -> Result<impl ExecutableTxIterator<Self>, Self::Error> {
let transactions = payload.inner.payload.transactions().clone().into_iter();
let transactions = payload.inner.payload.transactions().clone();
let convert = |encoded: Bytes| {
let tx = CustomTransaction::decode_2718_exact(encoded.as_ref())
.map_err(Into::into)