diff --git a/Cargo.lock b/Cargo.lock index a182884db1..770a7f26d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8639,6 +8639,7 @@ dependencies = [ "derive_more", "futures-util", "metrics", + "rayon", "reth-ethereum-forks", "reth-ethereum-primitives", "reth-execution-errors", diff --git a/crates/engine/primitives/src/lib.rs b/crates/engine/primitives/src/lib.rs index 0c5dcc6c22..a16b5b1c45 100644 --- a/crates/engine/primitives/src/lib.rs +++ b/crates/engine/primitives/src/lib.rs @@ -22,7 +22,8 @@ use reth_trie_common::HashedPostState; use serde::{de::DeserializeOwned, Serialize}; // Re-export [`ExecutionPayload`] moved to `reth_payload_primitives` -pub use reth_evm::{ConfigureEngineEvm, ExecutableTxIterator}; +#[cfg(feature = "std")] +pub use reth_evm::{ConfigureEngineEvm, ExecutableTxIterator, ExecutableTxTuple}; pub use reth_payload_primitives::ExecutionPayload; mod error; diff --git a/crates/engine/tree/Cargo.toml b/crates/engine/tree/Cargo.toml index 2b281b90b2..1b558ebf89 100644 --- a/crates/engine/tree/Cargo.toml +++ b/crates/engine/tree/Cargo.toml @@ -16,7 +16,7 @@ reth-chain-state.workspace = true reth-chainspec = { workspace = true, optional = true } reth-consensus.workspace = true reth-db.workspace = true -reth-engine-primitives.workspace = true +reth-engine-primitives = { workspace = true, features = ["std"] } reth-errors.workspace = true reth-execution-types.workspace = true reth-evm = { workspace = true, features = ["metrics"] } diff --git a/crates/engine/tree/benches/state_root_task.rs b/crates/engine/tree/benches/state_root_task.rs index a99562923b..8bda39a4eb 100644 --- a/crates/engine/tree/benches/state_root_task.rs +++ b/crates/engine/tree/benches/state_root_task.rs @@ -230,12 +230,12 @@ fn bench_state_root(c: &mut Criterion) { let mut handle = payload_processor.spawn( Default::default(), ( - core::iter::empty::< + Vec::< Result< Recovered, core::convert::Infallible, >, - >(), + >::new(), std::convert::identity, ), StateProviderBuilder::new(provider.clone(), genesis_hash, None), diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index b94c2bce91..731f956eaa 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -21,7 +21,7 @@ use executor::WorkloadExecutor; use multiproof::{SparseTrieUpdate, *}; use parking_lot::RwLock; use prewarm::PrewarmMetrics; -use rayon::iter::{ParallelBridge, ParallelIterator}; +use rayon::prelude::*; use reth_engine_primitives::ExecutableTxIterator; use reth_evm::{ execute::{ExecutableTxFor, WithTxEnv}, @@ -318,36 +318,32 @@ where usize, ) { let (transactions, convert) = transactions.into(); - let transactions = transactions.into_iter(); - // Get the transaction count for prewarming task - // Use upper bound if available (more accurate), otherwise use lower bound - let (lower, upper) = transactions.size_hint(); - let transaction_count_hint = upper.unwrap_or(lower); + let transactions = transactions.into_par_iter(); + let transaction_count_hint = transactions.len(); - // Spawn a task that iterates through all transactions in parallel and sends them to the - // main task. - let (tx, rx) = mpsc::channel(); + let (ooo_tx, ooo_rx) = mpsc::channel(); + let (prewarm_tx, prewarm_rx) = mpsc::channel(); + let (execute_tx, execute_rx) = mpsc::channel(); + + // Spawn a task that `convert`s all transactions in parallel and sends them out-of-order. self.executor.spawn_blocking(move || { - transactions.enumerate().par_bridge().for_each_with(tx, |sender, (idx, tx)| { + transactions.enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| { let tx = convert(tx); let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx: Arc::new(tx) }); - let _ = sender.send((idx, tx)); + // Only send Ok(_) variants to prewarming task. + if let Ok(tx) = &tx { + let _ = prewarm_tx.send(tx.clone()); + } + let _ = ooo_tx.send((idx, tx)); }); }); // Spawn a task that processes out-of-order transactions from the task above and sends them - // to prewarming and execution tasks. - let (prewarm_tx, prewarm_rx) = mpsc::channel(); - let (execute_tx, execute_rx) = mpsc::channel(); + // to the execution task in order. self.executor.spawn_blocking(move || { let mut next_for_execution = 0; let mut queue = BTreeMap::new(); - while let Ok((idx, tx)) = rx.recv() { - // only send Ok(_) variants to prewarming task - if let Ok(tx) = &tx { - let _ = prewarm_tx.send(tx.clone()); - } - + while let Ok((idx, tx)) = ooo_rx.recv() { if next_for_execution == idx { let _ = execute_tx.send(tx); next_for_execution += 1; @@ -1057,19 +1053,16 @@ mod tests { let provider_factory = BlockchainProvider::new(factory).unwrap(); - let mut handle = - payload_processor.spawn( - Default::default(), - ( - core::iter::empty::< - Result, core::convert::Infallible>, - >(), - std::convert::identity, - ), - StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None), - OverlayStateProviderFactory::new(provider_factory), - &TreeConfig::default(), - ); + let mut handle = payload_processor.spawn( + Default::default(), + ( + Vec::, core::convert::Infallible>>::new(), + std::convert::identity, + ), + StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None), + OverlayStateProviderFactory::new(provider_factory), + &TreeConfig::default(), + ); let mut state_hook = handle.state_hook(); diff --git a/crates/engine/tree/src/tree/payload_validator.rs b/crates/engine/tree/src/tree/payload_validator.rs index a8f50b98c9..1ba2a1d1b8 100644 --- a/crates/engine/tree/src/tree/payload_validator.rs +++ b/crates/engine/tree/src/tree/payload_validator.rs @@ -15,6 +15,7 @@ use alloy_eip7928::BlockAccessList; use alloy_eips::{eip1898::BlockWithParent, NumHash}; use alloy_evm::Evm; use alloy_primitives::B256; +use rayon::prelude::*; use reth_chain_state::{CanonicalInMemoryState, DeferredTrieData, ExecutedBlock}; use reth_consensus::{ConsensusError, FullConsensus}; use reth_engine_primitives::{ @@ -221,7 +222,7 @@ where .map_err(NewPayloadError::other)? .into(); - let iter = Either::Left(iter.into_iter().map(Either::Left)); + let iter = Either::Left(iter.into_par_iter().map(Either::Left)); let convert = move |tx| { let Either::Left(tx) = tx else { unreachable!() }; convert(tx).map(Either::Left).map_err(Either::Left) @@ -231,8 +232,9 @@ where Ok((iter, Box::new(convert) as Box _ + Send + Sync + 'static>)) } BlockOrPayload::Block(block) => { - let iter = - Either::Right(block.body().clone_transactions().into_iter().map(Either::Right)); + let iter = Either::Right( + block.body().clone_transactions().into_par_iter().map(Either::Right), + ); let convert = move |tx: Either<_, N::SignedTx>| { let Either::Right(tx) = tx else { unreachable!() }; tx.try_into_recovered().map(Either::Right).map_err(Either::Right) diff --git a/crates/engine/util/Cargo.toml b/crates/engine/util/Cargo.toml index 58ee6ac255..ca7382c192 100644 --- a/crates/engine/util/Cargo.toml +++ b/crates/engine/util/Cargo.toml @@ -16,7 +16,7 @@ reth-primitives-traits.workspace = true reth-errors.workspace = true reth-chainspec.workspace = true reth-fs-util.workspace = true -reth-engine-primitives.workspace = true +reth-engine-primitives = { workspace = true, features = ["std"] } reth-engine-tree.workspace = true reth-evm.workspace = true reth-revm.workspace = true diff --git a/crates/ethereum/evm/src/lib.rs b/crates/ethereum/evm/src/lib.rs index 094400f918..dbc686fe4f 100644 --- a/crates/ethereum/evm/src/lib.rs +++ b/crates/ethereum/evm/src/lib.rs @@ -19,32 +19,37 @@ extern crate alloc; use alloc::{borrow::Cow, sync::Arc}; use alloy_consensus::Header; -use alloy_eips::Decodable2718; -pub use alloy_evm::EthEvm; use alloy_evm::{ eth::{EthBlockExecutionCtx, EthBlockExecutorFactory}, EthEvmFactory, FromRecoveredTx, FromTxWithEncoded, }; -use alloy_primitives::{Bytes, U256}; -use alloy_rpc_types_engine::ExecutionData; use core::{convert::Infallible, fmt::Debug}; -use reth_chainspec::{ChainSpec, EthChainSpec, EthereumHardforks, MAINNET}; +use reth_chainspec::{ChainSpec, EthChainSpec, MAINNET}; use reth_ethereum_primitives::{Block, EthPrimitives, TransactionSigned}; use reth_evm::{ - eth::NextEvmEnvAttributes, precompiles::PrecompilesMap, ConfigureEngineEvm, ConfigureEvm, - EvmEnv, EvmEnvFor, EvmFactory, ExecutableTxIterator, ExecutionCtxFor, NextBlockEnvAttributes, - TransactionEnv, + eth::NextEvmEnvAttributes, precompiles::PrecompilesMap, ConfigureEvm, EvmEnv, EvmFactory, + NextBlockEnvAttributes, TransactionEnv, }; -use reth_primitives_traits::{ - constants::MAX_TX_GAS_LIMIT_OSAKA, SealedBlock, SealedHeader, SignedTransaction, TxTy, -}; -use reth_storage_errors::any::AnyError; -use revm::{ - context::{BlockEnv, CfgEnv}, - context_interface::block::BlobExcessGasAndPrice, - primitives::hardfork::SpecId, +use reth_primitives_traits::{SealedBlock, SealedHeader}; +use revm::{context::BlockEnv, primitives::hardfork::SpecId}; + +#[cfg(feature = "std")] +use reth_evm::{ConfigureEngineEvm, ExecutableTxIterator}; +#[allow(unused_imports)] +use { + alloy_eips::Decodable2718, + alloy_primitives::{Bytes, U256}, + alloy_rpc_types_engine::ExecutionData, + reth_chainspec::EthereumHardforks, + reth_evm::{EvmEnvFor, ExecutionCtxFor}, + reth_primitives_traits::{constants::MAX_TX_GAS_LIMIT_OSAKA, SignedTransaction, TxTy}, + reth_storage_errors::any::AnyError, + revm::context::CfgEnv, + revm::context_interface::block::BlobExcessGasAndPrice, }; +pub use alloy_evm::EthEvm; + mod config; use alloy_evm::eth::spec::EthExecutorSpec; pub use config::{revm_spec, revm_spec_by_timestamp_and_block_number}; @@ -206,6 +211,7 @@ where } } +#[cfg(feature = "std")] impl ConfigureEngineEvm for EthEvmConfig where ChainSpec: EthExecutorSpec + EthChainSpec
+ Hardforks + 'static, @@ -286,7 +292,7 @@ where &self, payload: &ExecutionData, ) -> Result, Self::Error> { - let txs = payload.payload.transactions().clone().into_iter(); + let txs = payload.payload.transactions().clone(); let convert = |tx: Bytes| { let tx = TxTy::::decode_2718_exact(tx.as_ref()).map_err(AnyError::new)?; diff --git a/crates/ethereum/node/Cargo.toml b/crates/ethereum/node/Cargo.toml index 0209dcb70a..02d1fc4b83 100644 --- a/crates/ethereum/node/Cargo.toml +++ b/crates/ethereum/node/Cargo.toml @@ -24,7 +24,7 @@ reth-provider.workspace = true reth-transaction-pool.workspace = true reth-network.workspace = true reth-evm.workspace = true -reth-evm-ethereum.workspace = true +reth-evm-ethereum = { workspace = true, features = ["std"] } reth-rpc.workspace = true reth-rpc-api.workspace = true reth-rpc-eth-api.workspace = true @@ -35,7 +35,7 @@ reth-chainspec.workspace = true reth-revm = { workspace = true, features = ["std"] } reth-rpc-eth-types.workspace = true reth-engine-local.workspace = true -reth-engine-primitives.workspace = true +reth-engine-primitives = { workspace = true, features = ["std"] } reth-payload-primitives.workspace = true # ethereum diff --git a/crates/ethereum/payload/Cargo.toml b/crates/ethereum/payload/Cargo.toml index 42d159fb84..4326d9b193 100644 --- a/crates/ethereum/payload/Cargo.toml +++ b/crates/ethereum/payload/Cargo.toml @@ -24,7 +24,7 @@ reth-payload-builder-primitives.workspace = true reth-payload-primitives.workspace = true reth-basic-payload-builder.workspace = true reth-evm.workspace = true -reth-evm-ethereum.workspace = true +reth-evm-ethereum = { workspace = true, features = ["std"] } reth-errors.workspace = true reth-chainspec.workspace = true reth-payload-validator.workspace = true diff --git a/crates/evm/evm/Cargo.toml b/crates/evm/evm/Cargo.toml index 4bc8ef06db..ce75269838 100644 --- a/crates/evm/evm/Cargo.toml +++ b/crates/evm/evm/Cargo.toml @@ -32,6 +32,7 @@ auto_impl.workspace = true derive_more.workspace = true futures-util.workspace = true metrics = { workspace = true, optional = true } +rayon = { workspace = true, optional = true } [dev-dependencies] reth-ethereum-primitives.workspace = true @@ -40,6 +41,7 @@ reth-ethereum-forks.workspace = true [features] default = ["std"] std = [ + "dep:rayon", "reth-primitives-traits/std", "alloy-eips/std", "alloy-primitives/std", diff --git a/crates/evm/evm/src/engine.rs b/crates/evm/evm/src/engine.rs index 48fa55162b..5663745f45 100644 --- a/crates/evm/evm/src/engine.rs +++ b/crates/evm/evm/src/engine.rs @@ -1,4 +1,5 @@ use crate::{execute::ExecutableTxFor, ConfigureEvm, EvmEnvFor, ExecutionCtxFor}; +use rayon::prelude::*; /// [`ConfigureEvm`] extension providing methods for executing payloads. pub trait ConfigureEngineEvm: ConfigureEvm { @@ -21,7 +22,7 @@ pub trait ConfigureEngineEvm: ConfigureEvm { /// A helper trait representing a pair of a "raw" transactions iterator and a closure that can be /// used to convert them to an executable transaction. This tuple is used in the engine to /// parallelize heavy work like decoding or recovery. -pub trait ExecutableTxTuple: Into<(Self::Iter, Self::Convert)> + Send + 'static { +pub trait ExecutableTxTuple: Into<(Self::IntoIter, Self::Convert)> + Send + 'static { /// Raw transaction that can be converted to an [`ExecutableTxTuple::Tx`] /// /// This can be any type that can be converted to an [`ExecutableTxTuple::Tx`]. For example, @@ -32,8 +33,10 @@ pub trait ExecutableTxTuple: Into<(Self::Iter, Self::Convert)> + Send + 'static /// Errors that may occur while recovering or decoding transactions. type Error: core::error::Error + Send + Sync + 'static; - /// Iterator over [`ExecutableTxTuple::Tx`] - type Iter: Iterator + Send + 'static; + /// Iterator over [`ExecutableTxTuple::Tx`]. + type IntoIter: IntoParallelIterator + + Send + + 'static; /// Closure that can be used to convert a [`ExecutableTxTuple::RawTx`] to a /// [`ExecutableTxTuple::Tx`]. This might involve heavy work like decoding or recovery /// and will be parallelized in the engine. @@ -45,14 +48,14 @@ where RawTx: Send + Sync + 'static, Tx: Clone + Send + Sync + 'static, Err: core::error::Error + Send + Sync + 'static, - I: Iterator + Send + 'static, + I: IntoParallelIterator + Send + 'static, F: Fn(RawTx) -> Result + Send + Sync + 'static, { type RawTx = RawTx; type Tx = Tx; type Error = Err; - type Iter = I; + type IntoIter = I; type Convert = F; } diff --git a/crates/evm/evm/src/lib.rs b/crates/evm/evm/src/lib.rs index aca8726b24..935e620d6f 100644 --- a/crates/evm/evm/src/lib.rs +++ b/crates/evm/evm/src/lib.rs @@ -44,8 +44,10 @@ pub mod execute; mod aliases; pub use aliases::*; +#[cfg(feature = "std")] mod engine; -pub use engine::{ConfigureEngineEvm, ExecutableTxIterator}; +#[cfg(feature = "std")] +pub use engine::{ConfigureEngineEvm, ExecutableTxIterator, ExecutableTxTuple}; #[cfg(feature = "metrics")] pub mod metrics; diff --git a/crates/optimism/evm/src/lib.rs b/crates/optimism/evm/src/lib.rs index 2fa27cf944..1805cc4036 100644 --- a/crates/optimism/evm/src/lib.rs +++ b/crates/optimism/evm/src/lib.rs @@ -13,32 +13,38 @@ extern crate alloc; use alloc::sync::Arc; use alloy_consensus::{BlockHeader, Header}; -use alloy_eips::Decodable2718; use alloy_evm::{EvmFactory, FromRecoveredTx, FromTxWithEncoded}; use alloy_op_evm::block::{receipt_builder::OpReceiptBuilder, OpTxEnv}; -use alloy_primitives::{Bytes, U256}; use core::fmt::Debug; use op_alloy_consensus::EIP1559ParamError; -use op_alloy_rpc_types_engine::OpExecutionData; use op_revm::{OpSpecId, OpTransaction}; use reth_chainspec::EthChainSpec; use reth_evm::{ - eth::NextEvmEnvAttributes, precompiles::PrecompilesMap, ConfigureEngineEvm, ConfigureEvm, - EvmEnv, EvmEnvFor, ExecutableTxIterator, ExecutionCtxFor, TransactionEnv, + eth::NextEvmEnvAttributes, precompiles::PrecompilesMap, ConfigureEvm, EvmEnv, TransactionEnv, }; use reth_optimism_chainspec::OpChainSpec; use reth_optimism_forks::OpHardforks; use reth_optimism_primitives::{DepositReceipt, OpPrimitives}; -use reth_primitives_traits::{ - NodePrimitives, SealedBlock, SealedHeader, SignedTransaction, TxTy, WithEncoded, -}; -use reth_storage_errors::any::AnyError; -use revm::{ - context::{BlockEnv, CfgEnv, TxEnv}, - context_interface::block::BlobExcessGasAndPrice, - primitives::hardfork::SpecId, +use reth_primitives_traits::{NodePrimitives, SealedBlock, SealedHeader, SignedTransaction}; +use revm::context::{BlockEnv, TxEnv}; + +#[allow(unused_imports)] +use { + alloy_eips::Decodable2718, + alloy_primitives::{Bytes, U256}, + op_alloy_rpc_types_engine::OpExecutionData, + reth_evm::{EvmEnvFor, ExecutionCtxFor}, + reth_primitives_traits::{TxTy, WithEncoded}, + reth_storage_errors::any::AnyError, + revm::{ + context::CfgEnv, context_interface::block::BlobExcessGasAndPrice, + primitives::hardfork::SpecId, + }, }; +#[cfg(feature = "std")] +use reth_evm::{ConfigureEngineEvm, ExecutableTxIterator}; + mod config; pub use config::{revm_spec, revm_spec_by_timestamp_after_bedrock, OpNextBlockEnvAttributes}; mod execute; @@ -200,6 +206,7 @@ where } } +#[cfg(feature = "std")] impl ConfigureEngineEvm for OpEvmConfig where ChainSpec: EthChainSpec
+ OpHardforks, @@ -265,7 +272,7 @@ where &self, payload: &OpExecutionData, ) -> Result, Self::Error> { - let transactions = payload.payload.transactions().clone().into_iter(); + let transactions = payload.payload.transactions().clone(); let convert = |encoded: Bytes| { let tx = TxTy::::decode_2718_exact(encoded.as_ref()) .map_err(AnyError::new)?; diff --git a/crates/optimism/node/Cargo.toml b/crates/optimism/node/Cargo.toml index 0576b3897f..313748512b 100644 --- a/crates/optimism/node/Cargo.toml +++ b/crates/optimism/node/Cargo.toml @@ -34,7 +34,7 @@ reth-rpc-api.workspace = true # op-reth reth-optimism-payload-builder.workspace = true -reth-optimism-evm = { workspace = true, features = ["rpc"] } +reth-optimism-evm = { workspace = true, features = ["std", "rpc"] } reth-optimism-rpc.workspace = true reth-optimism-storage.workspace = true reth-optimism-txpool.workspace = true diff --git a/examples/custom-node/src/evm/config.rs b/examples/custom-node/src/evm/config.rs index 92810439a8..c29ac07563 100644 --- a/examples/custom-node/src/evm/config.rs +++ b/examples/custom-node/src/evm/config.rs @@ -127,7 +127,7 @@ impl ConfigureEngineEvm for CustomEvmConfig { &self, payload: &CustomExecutionData, ) -> Result, Self::Error> { - let transactions = payload.inner.payload.transactions().clone().into_iter(); + let transactions = payload.inner.payload.transactions().clone(); let convert = |encoded: Bytes| { let tx = CustomTransaction::decode_2718_exact(encoded.as_ref()) .map_err(Into::into)