mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
feat(evm): impl ExecutableTxTuple for Either via EitherTxIterator (#22102)
Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com> Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
committed by
GitHub
parent
90e15d096d
commit
a0b60b7e64
@@ -23,7 +23,7 @@ use serde::{de::DeserializeOwned, Serialize};
|
||||
|
||||
// Re-export [`ExecutionPayload`] moved to `reth_payload_primitives`
|
||||
#[cfg(feature = "std")]
|
||||
pub use reth_evm::{ConfigureEngineEvm, ExecutableTxIterator, ExecutableTxTuple};
|
||||
pub use reth_evm::{ConfigureEngineEvm, ConvertTx, ExecutableTxIterator, ExecutableTxTuple};
|
||||
pub use reth_payload_primitives::ExecutionPayload;
|
||||
|
||||
mod error;
|
||||
|
||||
@@ -23,8 +23,8 @@ use rayon::prelude::*;
|
||||
use reth_evm::{
|
||||
block::ExecutableTxParts,
|
||||
execute::{ExecutableTxFor, WithTxEnv},
|
||||
ConfigureEvm, EvmEnvFor, ExecutableTxIterator, ExecutableTxTuple, OnStateHook, SpecFor,
|
||||
TxEnvFor,
|
||||
ConfigureEvm, ConvertTx, EvmEnvFor, ExecutableTxIterator, ExecutableTxTuple, OnStateHook,
|
||||
SpecFor, TxEnvFor,
|
||||
};
|
||||
use reth_metrics::Metrics;
|
||||
use reth_primitives_traits::NodePrimitives;
|
||||
@@ -370,9 +370,9 @@ where
|
||||
|
||||
// Spawn a task that `convert`s all transactions in parallel and sends them out-of-order.
|
||||
rayon::spawn(move || {
|
||||
let (transactions, convert) = transactions.into();
|
||||
let (transactions, convert) = transactions.into_parts();
|
||||
transactions.into_par_iter().enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| {
|
||||
let tx = convert(tx);
|
||||
let tx = convert.convert(tx);
|
||||
let tx = tx.map(|tx| {
|
||||
let (tx_env, tx) = tx.into_parts();
|
||||
WithTxEnv { tx_env, tx: Arc::new(tx) }
|
||||
|
||||
@@ -17,7 +17,6 @@ use alloy_evm::Evm;
|
||||
use alloy_primitives::B256;
|
||||
|
||||
use crate::tree::payload_processor::receipt_root_task::{IndexedReceipt, ReceiptRootTaskHandle};
|
||||
use rayon::prelude::*;
|
||||
use reth_chain_state::{CanonicalInMemoryState, DeferredTrieData, ExecutedBlock, LazyOverlay};
|
||||
use reth_consensus::{ConsensusError, FullConsensus, ReceiptRootBloom};
|
||||
use reth_engine_primitives::{
|
||||
@@ -232,35 +231,20 @@ where
|
||||
V: PayloadValidator<T, Block = N::Block>,
|
||||
Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
|
||||
{
|
||||
match input {
|
||||
Ok(match input {
|
||||
BlockOrPayload::Payload(payload) => {
|
||||
let (iter, convert) = self
|
||||
let iter = self
|
||||
.evm_config
|
||||
.tx_iterator_for_payload(payload)
|
||||
.map_err(NewPayloadError::other)?
|
||||
.into();
|
||||
|
||||
let iter = Either::Left(iter.into_par_iter().map(Either::Left));
|
||||
let convert = move |tx| {
|
||||
let Either::Left(tx) = tx else { unreachable!() };
|
||||
convert(tx).map(Either::Left).map_err(Either::Left)
|
||||
};
|
||||
|
||||
// Box the closure to satisfy the `Fn` bound both here and in the branch below
|
||||
Ok((iter, Box::new(convert) as Box<dyn Fn(_) -> _ + Send + Sync + 'static>))
|
||||
.map_err(NewPayloadError::other)?;
|
||||
Either::Left(iter)
|
||||
}
|
||||
BlockOrPayload::Block(block) => {
|
||||
let iter = Either::Right(
|
||||
block.body().clone_transactions().into_par_iter().map(Either::Right),
|
||||
);
|
||||
let convert = move |tx: Either<_, N::SignedTx>| {
|
||||
let Either::Right(tx) = tx else { unreachable!() };
|
||||
tx.try_into_recovered().map(Either::Right).map_err(Either::Right)
|
||||
};
|
||||
|
||||
Ok((iter, Box::new(convert)))
|
||||
let txs = block.body().clone_transactions();
|
||||
let convert = |tx: N::SignedTx| tx.try_into_recovered();
|
||||
Either::Right((txs, convert))
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns a [`ExecutionCtxFor`] for the given payload or block.
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::{execute::ExecutableTxFor, ConfigureEvm, EvmEnvFor, ExecutionCtxFor, TxEnvFor};
|
||||
use alloy_consensus::transaction::Either;
|
||||
use alloy_evm::{block::ExecutableTxParts, RecoveredTx};
|
||||
use rayon::prelude::*;
|
||||
use reth_primitives_traits::TxTy;
|
||||
@@ -21,10 +22,55 @@ pub trait ConfigureEngineEvm<ExecutionData>: ConfigureEvm {
|
||||
) -> Result<impl ExecutableTxIterator<Self>, Self::Error>;
|
||||
}
|
||||
|
||||
/// Converts a raw transaction into an executable transaction.
|
||||
///
|
||||
/// This trait abstracts the conversion logic (e.g., decoding, signature recovery) that is
|
||||
/// parallelized in the engine.
|
||||
pub trait ConvertTx<RawTx>: Send + Sync + 'static {
|
||||
/// The executable transaction type.
|
||||
type Tx;
|
||||
/// Errors that may occur during conversion.
|
||||
type Error;
|
||||
/// Converts a raw transaction.
|
||||
fn convert(&self, raw: RawTx) -> Result<Self::Tx, Self::Error>;
|
||||
}
|
||||
|
||||
// Blanket impl so closures still work.
|
||||
impl<F, RawTx, Tx, Err> ConvertTx<RawTx> for F
|
||||
where
|
||||
F: Fn(RawTx) -> Result<Tx, Err> + Send + Sync + 'static,
|
||||
{
|
||||
type Tx = Tx;
|
||||
type Error = Err;
|
||||
fn convert(&self, raw: RawTx) -> Result<Tx, Err> {
|
||||
self(raw)
|
||||
}
|
||||
}
|
||||
|
||||
impl<A, B, RA, RB> ConvertTx<Either<RA, RB>> for Either<A, B>
|
||||
where
|
||||
A: ConvertTx<RA>,
|
||||
B: ConvertTx<RB>,
|
||||
{
|
||||
type Tx = Either<A::Tx, B::Tx>;
|
||||
type Error = Either<A::Error, B::Error>;
|
||||
fn convert(&self, raw: Either<RA, RB>) -> Result<Self::Tx, Self::Error> {
|
||||
match (self, raw) {
|
||||
(Self::Left(a), Either::Left(raw)) => {
|
||||
a.convert(raw).map(Either::Left).map_err(Either::Left)
|
||||
}
|
||||
(Self::Right(b), Either::Right(raw)) => {
|
||||
b.convert(raw).map(Either::Right).map_err(Either::Right)
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A helper trait representing a pair of a "raw" transactions iterator and a closure that can be
|
||||
/// used to convert them to an executable transaction. This tuple is used in the engine to
|
||||
/// parallelize heavy work like decoding or recovery.
|
||||
pub trait ExecutableTxTuple: Into<(Self::IntoIter, Self::Convert)> + Send + 'static {
|
||||
pub trait ExecutableTxTuple: Send + 'static {
|
||||
/// Raw transaction that can be converted to an [`ExecutableTxTuple::Tx`]
|
||||
///
|
||||
/// This can be any type that can be converted to an [`ExecutableTxTuple::Tx`]. For example,
|
||||
@@ -39,10 +85,13 @@ pub trait ExecutableTxTuple: Into<(Self::IntoIter, Self::Convert)> + Send + 'sta
|
||||
type IntoIter: IntoParallelIterator<Item = Self::RawTx, Iter: IndexedParallelIterator>
|
||||
+ Send
|
||||
+ 'static;
|
||||
/// Closure that can be used to convert a [`ExecutableTxTuple::RawTx`] to a
|
||||
/// Converter that can be used to convert a [`ExecutableTxTuple::RawTx`] to a
|
||||
/// [`ExecutableTxTuple::Tx`]. This might involve heavy work like decoding or recovery
|
||||
/// and will be parallelized in the engine.
|
||||
type Convert: Fn(Self::RawTx) -> Result<Self::Tx, Self::Error> + Send + Sync + 'static;
|
||||
type Convert: ConvertTx<Self::RawTx, Tx = Self::Tx, Error = Self::Error>;
|
||||
|
||||
/// Decomposes into the raw transaction iterator and converter.
|
||||
fn into_parts(self) -> (Self::IntoIter, Self::Convert);
|
||||
}
|
||||
|
||||
impl<RawTx, Tx, Err, I, F> ExecutableTxTuple for (I, F)
|
||||
@@ -59,6 +108,10 @@ where
|
||||
|
||||
type IntoIter = I;
|
||||
type Convert = F;
|
||||
|
||||
fn into_parts(self) -> (I, F) {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterator over executable transactions.
|
||||
@@ -76,3 +129,45 @@ where
|
||||
{
|
||||
type Recovered = <T::Tx as ExecutableTxParts<TxEnvFor<Evm>, TxTy<Evm::Primitives>>>::Recovered;
|
||||
}
|
||||
|
||||
impl<A: ExecutableTxTuple, B: ExecutableTxTuple> ExecutableTxTuple for Either<A, B> {
|
||||
type RawTx = Either<A::RawTx, B::RawTx>;
|
||||
type Tx = Either<A::Tx, B::Tx>;
|
||||
type Error = Either<A::Error, B::Error>;
|
||||
type IntoIter = Either<
|
||||
rayon::iter::Map<
|
||||
<A::IntoIter as IntoParallelIterator>::Iter,
|
||||
fn(A::RawTx) -> Either<A::RawTx, B::RawTx>,
|
||||
>,
|
||||
rayon::iter::Map<
|
||||
<B::IntoIter as IntoParallelIterator>::Iter,
|
||||
fn(B::RawTx) -> Either<A::RawTx, B::RawTx>,
|
||||
>,
|
||||
>;
|
||||
type Convert = Either<A::Convert, B::Convert>;
|
||||
|
||||
fn into_parts(self) -> (Self::IntoIter, Self::Convert) {
|
||||
match self {
|
||||
Self::Left(a) => {
|
||||
let (iter, convert) = a.into_parts();
|
||||
(
|
||||
Either::Left(
|
||||
iter.into_par_iter()
|
||||
.map(Either::Left as fn(A::RawTx) -> Either<A::RawTx, B::RawTx>),
|
||||
),
|
||||
Either::Left(convert),
|
||||
)
|
||||
}
|
||||
Self::Right(b) => {
|
||||
let (iter, convert) = b.into_parts();
|
||||
(
|
||||
Either::Right(
|
||||
iter.into_par_iter()
|
||||
.map(Either::Right as fn(B::RawTx) -> Either<A::RawTx, B::RawTx>),
|
||||
),
|
||||
Either::Right(convert),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -47,7 +47,7 @@ pub use aliases::*;
|
||||
#[cfg(feature = "std")]
|
||||
mod engine;
|
||||
#[cfg(feature = "std")]
|
||||
pub use engine::{ConfigureEngineEvm, ExecutableTxIterator, ExecutableTxTuple};
|
||||
pub use engine::{ConfigureEngineEvm, ConvertTx, ExecutableTxIterator, ExecutableTxTuple};
|
||||
|
||||
#[cfg(feature = "metrics")]
|
||||
pub mod metrics;
|
||||
|
||||
Reference in New Issue
Block a user