Compare commits

...

12 Commits

Author SHA1 Message Date
Alexey Shekhirin
088bb1c1e6 Merge remote-tracking branch 'origin/klkvr/parallel-recovery' into alexey/temp 2025-12-08 13:09:12 +00:00
Arsenii Kulikov
c0c8cdb1eb docs 2025-12-08 16:17:59 +04:00
Arsenii Kulikov
be239feed5 review fixes 2025-12-08 16:02:17 +04:00
Alexey Shekhirin
6b8463146d Merge remote-tracking branch 'origin/main' into klkvr/parallel-recovery 2025-12-08 10:54:37 +00:00
Alexey Shekhirin
450bd12251 Merge branch 'alexey/engine-disable-state-cache' into alexey/temp 2025-12-06 23:38:25 +00:00
Alexey Shekhirin
d8cf50c02e Merge remote-tracking branch 'origin/klkvr/parallel-recovery' into alexey/temp 2025-12-06 19:54:59 +00:00
Arsenii Kulikov
5caeb5200d clippy 2025-12-06 21:09:45 +04:00
Arsenii Kulikov
21a78511dc feat: parallelize recovery 2025-12-06 21:02:06 +04:00
Alexey Shekhirin
ce7cbab250 add pre-post execution duration metrics 2025-12-06 14:35:15 +00:00
Alexey Shekhirin
67f0645d9e tx wait and execution metrics 2025-12-06 14:35:14 +00:00
Alexey Shekhirin
b82afb83dd do not update cache if disabled 2025-12-05 18:49:19 +00:00
Alexey Shekhirin
9cb2398550 feat(engine): cli argument to disable state cache 2025-12-05 15:45:30 +00:00
14 changed files with 245 additions and 80 deletions

View File

@@ -89,6 +89,8 @@ pub struct TreeConfig {
/// Whether to always compare trie updates from the state root task to the trie updates from
/// the regular state root calculation.
always_compare_trie_updates: bool,
/// Whether to disable state cache.
disable_state_cache: bool,
/// Whether to disable parallel prewarming.
disable_prewarming: bool,
/// Whether to disable the parallel sparse trie state root algorithm.
@@ -143,6 +145,7 @@ impl Default for TreeConfig {
max_execute_block_batch_size: DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE,
legacy_state_root: false,
always_compare_trie_updates: false,
disable_state_cache: false,
disable_prewarming: false,
disable_parallel_sparse_trie: false,
state_provider_metrics: false,
@@ -173,6 +176,7 @@ impl TreeConfig {
max_execute_block_batch_size: usize,
legacy_state_root: bool,
always_compare_trie_updates: bool,
disable_state_cache: bool,
disable_prewarming: bool,
disable_parallel_sparse_trie: bool,
state_provider_metrics: bool,
@@ -197,6 +201,7 @@ impl TreeConfig {
max_execute_block_batch_size,
legacy_state_root,
always_compare_trie_updates,
disable_state_cache,
disable_prewarming,
disable_parallel_sparse_trie,
state_provider_metrics,
@@ -271,7 +276,12 @@ impl TreeConfig {
self.disable_parallel_sparse_trie
}
/// Returns whether or not parallel prewarming should be used.
/// Returns whether or not state cache is disabled.
pub const fn disable_state_cache(&self) -> bool {
self.disable_state_cache
}
/// Returns whether or not parallel prewarming is disabled.
pub const fn disable_prewarming(&self) -> bool {
self.disable_prewarming
}
@@ -363,6 +373,12 @@ impl TreeConfig {
self
}
/// Setter for whether to disable state cache.
pub const fn without_state_cache(mut self, disable_state_cache: bool) -> Self {
self.disable_state_cache = disable_state_cache;
self
}
/// Setter for whether to disable parallel prewarming.
pub const fn without_prewarming(mut self, disable_prewarming: bool) -> Self {
self.disable_prewarming = disable_prewarming;

View File

@@ -229,9 +229,15 @@ fn bench_state_root(c: &mut Criterion) {
black_box({
let mut handle = payload_processor.spawn(
Default::default(),
core::iter::empty::<
Result<Recovered<TransactionSigned>, core::convert::Infallible>,
>(),
(
core::iter::empty::<
Result<
Recovered<TransactionSigned>,
core::convert::Infallible,
>,
>(),
std::convert::identity,
),
StateProviderBuilder::new(provider.clone(), genesis_hash, None),
OverlayStateProviderFactory::new(provider),
&TreeConfig::default(),

View File

@@ -63,7 +63,7 @@ impl EngineApiMetrics {
pub(crate) fn execute_metered<E, DB>(
&self,
executor: E,
transactions: impl Iterator<Item = Result<impl ExecutableTx<E>, BlockExecutionError>>,
mut transactions: impl Iterator<Item = Result<impl ExecutableTx<E>, BlockExecutionError>>,
state_hook: Box<dyn OnStateHook>,
) -> Result<(BlockExecutionOutput<E::Receipt>, Vec<Address>), BlockExecutionError>
where
@@ -79,27 +79,42 @@ impl EngineApiMetrics {
let mut executor = executor.with_state_hook(Some(Box::new(wrapper)));
let f = || {
let start = Instant::now();
debug_span!(target: "engine::tree", "pre execution")
.entered()
.in_scope(|| executor.apply_pre_execution_changes())?;
self.executor.pre_execution_histogram.record(start.elapsed());
let exec_span = debug_span!(target: "engine::tree", "execution").entered();
for tx in transactions {
loop {
let start = Instant::now();
let Some(tx) = transactions.next() else { break };
self.executor.transaction_wait_histogram.record(start.elapsed());
let tx = tx?;
senders.push(*tx.signer());
let span =
debug_span!(target: "engine::tree", "execute tx", tx_hash=?tx.tx().tx_hash());
let enter = span.entered();
trace!(target: "engine::tree", "Executing transaction");
senders.push(*tx.signer());
let start = Instant::now();
let gas_used = executor.execute_transaction(tx)?;
self.executor.transaction_execution_histogram.record(start.elapsed());
// record the tx gas used
enter.record("gas_used", gas_used);
}
drop(exec_span);
debug_span!(target: "engine::tree", "finish")
let start = Instant::now();
let result = debug_span!(target: "engine::tree", "finish")
.entered()
.in_scope(|| executor.finish())
.map(|(evm, result)| (evm.into_db(), result))
.map(|(evm, result)| (evm.into_db(), result));
self.executor.post_execution_histogram.record(start.elapsed());
result
};
// Use metered to execute and track timing/gas metrics

View File

@@ -21,6 +21,7 @@ use executor::WorkloadExecutor;
use multiproof::{SparseTrieUpdate, *};
use parking_lot::RwLock;
use prewarm::PrewarmMetrics;
use rayon::iter::{ParallelBridge, ParallelIterator};
use reth_engine_primitives::ExecutableTxIterator;
use reth_evm::{
execute::{ExecutableTxFor, WithTxEnv},
@@ -40,6 +41,7 @@ use reth_trie_sparse::{
};
use reth_trie_sparse_parallel::{ParallelSparseTrie, ParallelismThresholds};
use std::{
collections::BTreeMap,
sync::{
atomic::AtomicBool,
mpsc::{self, channel},
@@ -104,6 +106,8 @@ where
cross_block_cache_size: u64,
/// Whether transactions should not be executed on prewarming task.
disable_transaction_prewarming: bool,
/// Whether state cache should be disable
disable_state_cache: bool,
/// Determines how to configure the evm for execution.
evm_config: Evm,
/// Whether precompile cache should be disabled.
@@ -147,6 +151,7 @@ where
cross_block_cache_size: config.cross_block_cache_size(),
disable_transaction_prewarming: config.disable_prewarming(),
evm_config,
disable_state_cache: config.disable_state_cache(),
precompile_cache_disabled: config.precompile_cache_disabled(),
precompile_cache_map,
sparse_state_trie: Arc::default(),
@@ -312,21 +317,50 @@ where
mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>>,
usize,
) {
let (transactions, convert) = transactions.into();
let transactions = transactions.into_iter();
// Get the transaction count for prewarming task
// Use upper bound if available (more accurate), otherwise use lower bound
let (lower, upper) = transactions.size_hint();
let transaction_count_hint = upper.unwrap_or(lower);
// Spawn a task that iterates through all transactions in parallel and sends them to the
// main task.
let (tx, rx) = mpsc::channel();
self.executor.spawn_blocking(move || {
transactions.enumerate().par_bridge().for_each_with(tx, |sender, (idx, tx)| {
let tx = convert(tx);
let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx: Arc::new(tx) });
let _ = sender.send((idx, tx));
});
});
// Spawn a task that processes out-of-order transactions from the task above and sends them
// to prewarming and execution tasks.
let (prewarm_tx, prewarm_rx) = mpsc::channel();
let (execute_tx, execute_rx) = mpsc::channel();
self.executor.spawn_blocking(move || {
for tx in transactions {
let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx: Arc::new(tx) });
let mut next_for_execution = 0;
let mut queue = BTreeMap::new();
while let Ok((idx, tx)) = rx.recv() {
// only send Ok(_) variants to prewarming task
if let Ok(tx) = &tx {
let _ = prewarm_tx.send(tx.clone());
}
let _ = execute_tx.send(tx);
if next_for_execution == idx {
let _ = execute_tx.send(tx);
next_for_execution += 1;
while let Some(entry) = queue.first_entry() &&
*entry.key() == next_for_execution
{
let _ = execute_tx.send(entry.remove());
next_for_execution += 1;
}
} else {
queue.insert(idx, tx);
}
}
});
@@ -351,9 +385,15 @@ where
transactions = mpsc::channel().1;
}
let saved_cache = self.cache_for(env.parent_hash);
let cache = saved_cache.cache().clone();
let cache_metrics = saved_cache.metrics().clone();
let (saved_cache, cache, cache_metrics) = if self.disable_state_cache {
(None, None, None)
} else {
let saved_cache = self.cache_for(env.parent_hash);
let cache = saved_cache.cache().clone();
let cache_metrics = saved_cache.metrics().clone();
(Some(saved_cache), Some(cache), Some(cache_metrics))
};
// configure prewarming
let prewarm_ctx = PrewarmContext {
env,
@@ -565,12 +605,12 @@ impl<Tx, Err> PayloadHandle<Tx, Err> {
}
/// Returns a clone of the caches used by prewarming
pub(super) fn caches(&self) -> StateExecutionCache {
pub(super) fn caches(&self) -> Option<StateExecutionCache> {
self.prewarm_handle.cache.clone()
}
/// Returns a clone of the cache metrics used by prewarming
pub(super) fn cache_metrics(&self) -> CachedStateMetrics {
pub(super) fn cache_metrics(&self) -> Option<CachedStateMetrics> {
self.prewarm_handle.cache_metrics.clone()
}
@@ -600,9 +640,9 @@ impl<Tx, Err> PayloadHandle<Tx, Err> {
#[derive(Debug)]
pub(crate) struct CacheTaskHandle {
/// The shared cache the task operates with.
cache: StateExecutionCache,
cache: Option<StateExecutionCache>,
/// Metrics for the caches
cache_metrics: CachedStateMetrics,
cache_metrics: Option<CachedStateMetrics>,
/// Channel to the spawned prewarm task if any
to_prewarm_task: Option<std::sync::mpsc::Sender<PrewarmTaskEvent>>,
}
@@ -1017,13 +1057,19 @@ mod tests {
let provider_factory = BlockchainProvider::new(factory).unwrap();
let mut handle = payload_processor.spawn(
Default::default(),
core::iter::empty::<Result<Recovered<TransactionSigned>, core::convert::Infallible>>(),
StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
OverlayStateProviderFactory::new(provider_factory),
&TreeConfig::default(),
);
let mut handle =
payload_processor.spawn(
Default::default(),
(
core::iter::empty::<
Result<Recovered<TransactionSigned>, core::convert::Infallible>,
>(),
std::convert::identity,
),
StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
OverlayStateProviderFactory::new(provider_factory),
&TreeConfig::default(),
);
let mut state_hook = handle.state_hook();

View File

@@ -29,7 +29,7 @@ use metrics::{Counter, Gauge, Histogram};
use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, SpecFor};
use reth_metrics::Metrics;
use reth_primitives_traits::NodePrimitives;
use reth_provider::{BlockReader, StateProviderFactory, StateReader};
use reth_provider::{BlockReader, StateProviderBox, StateProviderFactory, StateReader};
use reth_revm::{database::StateProviderDatabase, db::BundleState, state::EvmState};
use reth_trie::MultiProofTargets;
use std::{
@@ -255,31 +255,35 @@ where
self;
let hash = env.hash;
debug!(target: "engine::caching", parent_hash=?hash, "Updating execution cache");
// Perform all cache operations atomically under the lock
execution_cache.update_with_guard(|cached| {
// consumes the `SavedCache` held by the prewarming task, which releases its usage guard
let (caches, cache_metrics) = saved_cache.split();
let new_cache = SavedCache::new(hash, caches, cache_metrics);
if let Some(saved_cache) = saved_cache {
debug!(target: "engine::caching", parent_hash=?hash, "Updating execution cache");
// Perform all cache operations atomically under the lock
execution_cache.update_with_guard(|cached| {
// consumes the `SavedCache` held by the prewarming task, which releases its usage
// guard
let (caches, cache_metrics) = saved_cache.split();
let new_cache = SavedCache::new(hash, caches, cache_metrics);
// Insert state into cache while holding the lock
if new_cache.cache().insert_state(&state).is_err() {
// Clear the cache on error to prevent having a polluted cache
*cached = None;
debug!(target: "engine::caching", "cleared execution cache on update error");
return;
}
// Insert state into cache while holding the lock
if new_cache.cache().insert_state(&state).is_err() {
// Clear the cache on error to prevent having a polluted cache
*cached = None;
debug!(target: "engine::caching", "cleared execution cache on update error");
return;
}
new_cache.update_metrics();
new_cache.update_metrics();
// Replace the shared cache with the new one; the previous cache (if any) is dropped.
*cached = Some(new_cache);
});
// Replace the shared cache with the new one; the previous cache (if any) is
// dropped.
*cached = Some(new_cache);
});
let elapsed = start.elapsed();
debug!(target: "engine::caching", parent_hash=?hash, elapsed=?elapsed, "Updated execution cache");
let elapsed = start.elapsed();
debug!(target: "engine::caching", parent_hash=?hash, elapsed=?elapsed, "Updated execution cache");
metrics.cache_saving_duration.set(elapsed.as_secs_f64());
metrics.cache_saving_duration.set(elapsed.as_secs_f64());
}
}
/// Executes the task.
@@ -356,7 +360,7 @@ where
{
pub(super) env: ExecutionEnv<Evm>,
pub(super) evm_config: Evm,
pub(super) saved_cache: SavedCache,
pub(super) saved_cache: Option<SavedCache>,
/// Provider to obtain the state
pub(super) provider: StateProviderBuilder<N, P>,
pub(super) metrics: PrewarmMetrics,
@@ -400,10 +404,13 @@ where
};
// Use the caches to create a new provider with caching
let caches = saved_cache.cache().clone();
let cache_metrics = saved_cache.metrics().clone();
let state_provider =
CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics);
let state_provider: StateProviderBox = if let Some(saved_cache) = saved_cache {
let caches = saved_cache.cache().clone();
let cache_metrics = saved_cache.metrics().clone();
Box::new(CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics))
} else {
state_provider
};
let state_provider = StateProviderDatabase::new(state_provider);

View File

@@ -213,16 +213,31 @@ where
Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
{
match input {
BlockOrPayload::Payload(payload) => Ok(Either::Left(
self.evm_config
BlockOrPayload::Payload(payload) => {
let (iter, convert) = self
.evm_config
.tx_iterator_for_payload(payload)
.map_err(NewPayloadError::other)?
.map(|res| res.map(Either::Left).map_err(NewPayloadError::other)),
)),
.into();
let iter = Either::Left(iter.into_iter().map(Either::Left));
let convert = move |tx| {
let Either::Left(tx) = tx else { unreachable!() };
convert(tx).map(Either::Left).map_err(Either::Left)
};
// Box the closure to satisfy the `Fn` bound both here and in the branch below
Ok((iter, Box::new(convert) as Box<dyn Fn(_) -> _ + Send + Sync + 'static>))
}
BlockOrPayload::Block(block) => {
Ok(Either::Right(block.body().clone_transactions().into_iter().map(|tx| {
Ok(Either::Right(tx.try_into_recovered().map_err(NewPayloadError::other)?))
})))
let iter =
Either::Right(block.body().clone_transactions().into_iter().map(Either::Right));
let convert = move |tx: Either<_, N::SignedTx>| {
let Either::Right(tx) = tx else { unreachable!() };
tx.try_into_recovered().map(Either::Right).map_err(Either::Right)
};
Ok((iter, Box::new(convert)))
}
}
}
@@ -353,7 +368,7 @@ where
)
.into())
};
let state_provider = ensure_ok!(provider_builder.build());
let mut state_provider = ensure_ok!(provider_builder.build());
drop(_enter);
// fetch parent block
@@ -396,11 +411,13 @@ where
// Use cached state provider before executing, used in execution after prewarming threads
// complete
let state_provider = CachedStateProvider::new_with_caches(
state_provider,
handle.caches(),
handle.cache_metrics(),
);
if let Some((caches, cache_metrics)) = handle.caches().zip(handle.cache_metrics()) {
state_provider = Box::new(CachedStateProvider::new_with_caches(
state_provider,
caches,
cache_metrics,
));
};
// Execute the block and handle any execution errors
let (output, senders) = match if self.config.state_provider_metrics() {

View File

@@ -289,12 +289,15 @@ where
&self,
payload: &ExecutionData,
) -> Result<impl ExecutableTxIterator<Self>, Self::Error> {
Ok(payload.payload.transactions().clone().into_iter().map(|tx| {
let txs = payload.payload.transactions().clone().into_iter();
let convert = |tx: Bytes| {
let tx =
TxTy::<Self::Primitives>::decode_2718_exact(tx.as_ref()).map_err(AnyError::new)?;
let signer = tx.try_recover().map_err(AnyError::new)?;
Ok::<_, AnyError>(tx.with_signer(signer))
}))
};
Ok((txs, convert))
}
}

View File

@@ -18,22 +18,51 @@ pub trait ConfigureEngineEvm<ExecutionData>: ConfigureEvm {
) -> Result<impl ExecutableTxIterator<Self>, Self::Error>;
}
/// Iterator over executable transactions.
pub trait ExecutableTxIterator<Evm: ConfigureEvm>:
Iterator<Item = Result<Self::Tx, Self::Error>> + Send + 'static
{
/// A helper trait representing a pair of a "raw" transactions iterator and a closure that can be
/// used to convert them to an executable transaction. This tuple is used in the engine to
/// parallelize heavy work like decoding or recovery.
pub trait ExecutableTxTuple: Into<(Self::Iter, Self::Convert)> + Send + 'static {
/// Raw transaction that can be converted to an [`ExecutableTxTuple::Tx`]
///
/// This can be any type that can be converted to an [`ExecutableTxTuple::Tx`]. For example,
/// an unrecovered transaction or just the transaction bytes.
type RawTx: Send + Sync + 'static;
/// The executable transaction type iterator yields.
type Tx: ExecutableTxFor<Evm> + Clone + Send + Sync + 'static;
type Tx: Clone + Send + Sync + 'static;
/// Errors that may occur while recovering or decoding transactions.
type Error: core::error::Error + Send + Sync + 'static;
/// Iterator over [`ExecutableTxTuple::Tx`]
type Iter: Iterator<Item = Self::RawTx> + Send + 'static;
/// Closure that can be used to convert a [`ExecutableTxTuple::RawTx`] to a
/// [`ExecutableTxTuple::Tx`]. This might involve heavy work like decoding or recovery
/// and will be parallelized in the engine.
type Convert: Fn(Self::RawTx) -> Result<Self::Tx, Self::Error> + Send + Sync + 'static;
}
impl<Evm: ConfigureEvm, Tx, Err, T> ExecutableTxIterator<Evm> for T
impl<RawTx, Tx, Err, I, F> ExecutableTxTuple for (I, F)
where
Tx: ExecutableTxFor<Evm> + Clone + Send + Sync + 'static,
RawTx: Send + Sync + 'static,
Tx: Clone + Send + Sync + 'static,
Err: core::error::Error + Send + Sync + 'static,
T: Iterator<Item = Result<Tx, Err>> + Send + 'static,
I: Iterator<Item = RawTx> + Send + 'static,
F: Fn(RawTx) -> Result<Tx, Err> + Send + Sync + 'static,
{
type RawTx = RawTx;
type Tx = Tx;
type Error = Err;
type Iter = I;
type Convert = F;
}
/// Iterator over executable transactions.
pub trait ExecutableTxIterator<Evm: ConfigureEvm>:
ExecutableTxTuple<Tx: ExecutableTxFor<Evm>>
{
}
impl<T, Evm: ConfigureEvm> ExecutableTxIterator<Evm> for T where
T: ExecutableTxTuple<Tx: ExecutableTxFor<Evm>>
{
}

View File

@@ -17,6 +17,14 @@ pub struct ExecutorMetrics {
/// The Histogram for amount of gas used.
pub gas_used_histogram: Histogram,
/// The Histogram for amount of time taken to execute the pre-execution changes.
pub pre_execution_histogram: Histogram,
/// The Histogram for amount of time taken to wait for one transaction to be available.
pub transaction_wait_histogram: Histogram,
/// The Histogram for amount of time taken to execute one transaction.
pub transaction_execution_histogram: Histogram,
/// The Histogram for amount of time taken to execute the post-execution changes.
pub post_execution_histogram: Histogram,
/// The Histogram for amount of time taken to execute blocks.
pub execution_histogram: Histogram,
/// The total amount of time it took to execute the latest block.

View File

@@ -35,6 +35,10 @@ pub struct EngineArgs {
#[deprecated]
pub caching_and_prewarming_enabled: bool,
/// Disable state cache
#[arg(long = "engine.disable-state-cache")]
pub state_cache_disabled: bool,
/// Disable parallel prewarming
#[arg(long = "engine.disable-prewarming", alias = "engine.disable-caching-and-prewarming")]
pub prewarming_disabled: bool,
@@ -130,6 +134,7 @@ impl Default for EngineArgs {
legacy_state_root_task_enabled: false,
state_root_task_compare_updates: false,
caching_and_prewarming_enabled: true,
state_cache_disabled: false,
prewarming_disabled: false,
parallel_sparse_trie_enabled: true,
parallel_sparse_trie_disabled: false,
@@ -157,6 +162,7 @@ impl EngineArgs {
.with_persistence_threshold(self.persistence_threshold)
.with_memory_block_buffer_target(self.memory_block_buffer_target)
.with_legacy_state_root(self.legacy_state_root_task_enabled)
.without_state_cache(self.state_cache_disabled)
.without_prewarming(self.prewarming_disabled)
.with_disable_parallel_sparse_trie(self.parallel_sparse_trie_disabled)
.with_state_provider_metrics(self.state_provider_metrics)

View File

@@ -16,7 +16,7 @@ use alloy_consensus::{BlockHeader, Header};
use alloy_eips::Decodable2718;
use alloy_evm::{EvmFactory, FromRecoveredTx, FromTxWithEncoded};
use alloy_op_evm::block::{receipt_builder::OpReceiptBuilder, OpTxEnv};
use alloy_primitives::U256;
use alloy_primitives::{Bytes, U256};
use core::fmt::Debug;
use op_alloy_consensus::EIP1559ParamError;
use op_alloy_rpc_types_engine::OpExecutionData;
@@ -265,12 +265,15 @@ where
&self,
payload: &OpExecutionData,
) -> Result<impl ExecutableTxIterator<Self>, Self::Error> {
Ok(payload.payload.transactions().clone().into_iter().map(|encoded| {
let transactions = payload.payload.transactions().clone().into_iter();
let convert = |encoded: Bytes| {
let tx = TxTy::<Self::Primitives>::decode_2718_exact(encoded.as_ref())
.map_err(AnyError::new)?;
let signer = tx.try_recover().map_err(AnyError::new)?;
Ok::<_, AnyError>(WithEncoded::new(encoded, tx.with_signer(signer)))
}))
};
Ok((transactions, convert))
}
}

View File

@@ -885,6 +885,9 @@ Engine:
--engine.legacy-state-root
Enable legacy state root
--engine.disable-state-cache
Disable state cache
--engine.disable-prewarming
Disable parallel prewarming

View File

@@ -885,6 +885,9 @@ Engine:
--engine.legacy-state-root
Enable legacy state root
--engine.disable-state-cache
Disable state cache
--engine.disable-prewarming
Disable parallel prewarming

View File

@@ -25,6 +25,7 @@ use reth_op::{
primitives::SignedTransaction,
};
use reth_rpc_api::eth::helpers::pending_block::BuildPendingEnv;
use revm_primitives::Bytes;
use std::sync::Arc;
#[derive(Debug, Clone)]
@@ -126,13 +127,15 @@ impl ConfigureEngineEvm<CustomExecutionData> for CustomEvmConfig {
&self,
payload: &CustomExecutionData,
) -> Result<impl ExecutableTxIterator<Self>, Self::Error> {
Ok(payload.inner.payload.transactions().clone().into_iter().map(|encoded| {
let transactions = payload.inner.payload.transactions().clone().into_iter();
let convert = |encoded: Bytes| {
let tx = CustomTransaction::decode_2718_exact(encoded.as_ref())
.map_err(Into::into)
.map_err(PayloadError::Decode)?;
let signer = tx.try_recover().map_err(NewPayloadError::other)?;
Ok::<_, NewPayloadError>(WithEncoded::new(encoded, tx.with_signer(signer)))
}))
};
Ok((transactions, convert))
}
}