mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
12 Commits
devnet4
...
alexey/tem
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
088bb1c1e6 | ||
|
|
c0c8cdb1eb | ||
|
|
be239feed5 | ||
|
|
6b8463146d | ||
|
|
450bd12251 | ||
|
|
d8cf50c02e | ||
|
|
5caeb5200d | ||
|
|
21a78511dc | ||
|
|
ce7cbab250 | ||
|
|
67f0645d9e | ||
|
|
b82afb83dd | ||
|
|
9cb2398550 |
@@ -89,6 +89,8 @@ pub struct TreeConfig {
|
||||
/// Whether to always compare trie updates from the state root task to the trie updates from
|
||||
/// the regular state root calculation.
|
||||
always_compare_trie_updates: bool,
|
||||
/// Whether to disable state cache.
|
||||
disable_state_cache: bool,
|
||||
/// Whether to disable parallel prewarming.
|
||||
disable_prewarming: bool,
|
||||
/// Whether to disable the parallel sparse trie state root algorithm.
|
||||
@@ -143,6 +145,7 @@ impl Default for TreeConfig {
|
||||
max_execute_block_batch_size: DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE,
|
||||
legacy_state_root: false,
|
||||
always_compare_trie_updates: false,
|
||||
disable_state_cache: false,
|
||||
disable_prewarming: false,
|
||||
disable_parallel_sparse_trie: false,
|
||||
state_provider_metrics: false,
|
||||
@@ -173,6 +176,7 @@ impl TreeConfig {
|
||||
max_execute_block_batch_size: usize,
|
||||
legacy_state_root: bool,
|
||||
always_compare_trie_updates: bool,
|
||||
disable_state_cache: bool,
|
||||
disable_prewarming: bool,
|
||||
disable_parallel_sparse_trie: bool,
|
||||
state_provider_metrics: bool,
|
||||
@@ -197,6 +201,7 @@ impl TreeConfig {
|
||||
max_execute_block_batch_size,
|
||||
legacy_state_root,
|
||||
always_compare_trie_updates,
|
||||
disable_state_cache,
|
||||
disable_prewarming,
|
||||
disable_parallel_sparse_trie,
|
||||
state_provider_metrics,
|
||||
@@ -271,7 +276,12 @@ impl TreeConfig {
|
||||
self.disable_parallel_sparse_trie
|
||||
}
|
||||
|
||||
/// Returns whether or not parallel prewarming should be used.
|
||||
/// Returns whether or not state cache is disabled.
|
||||
pub const fn disable_state_cache(&self) -> bool {
|
||||
self.disable_state_cache
|
||||
}
|
||||
|
||||
/// Returns whether or not parallel prewarming is disabled.
|
||||
pub const fn disable_prewarming(&self) -> bool {
|
||||
self.disable_prewarming
|
||||
}
|
||||
@@ -363,6 +373,12 @@ impl TreeConfig {
|
||||
self
|
||||
}
|
||||
|
||||
/// Setter for whether to disable state cache.
|
||||
pub const fn without_state_cache(mut self, disable_state_cache: bool) -> Self {
|
||||
self.disable_state_cache = disable_state_cache;
|
||||
self
|
||||
}
|
||||
|
||||
/// Setter for whether to disable parallel prewarming.
|
||||
pub const fn without_prewarming(mut self, disable_prewarming: bool) -> Self {
|
||||
self.disable_prewarming = disable_prewarming;
|
||||
|
||||
@@ -229,9 +229,15 @@ fn bench_state_root(c: &mut Criterion) {
|
||||
black_box({
|
||||
let mut handle = payload_processor.spawn(
|
||||
Default::default(),
|
||||
core::iter::empty::<
|
||||
Result<Recovered<TransactionSigned>, core::convert::Infallible>,
|
||||
>(),
|
||||
(
|
||||
core::iter::empty::<
|
||||
Result<
|
||||
Recovered<TransactionSigned>,
|
||||
core::convert::Infallible,
|
||||
>,
|
||||
>(),
|
||||
std::convert::identity,
|
||||
),
|
||||
StateProviderBuilder::new(provider.clone(), genesis_hash, None),
|
||||
OverlayStateProviderFactory::new(provider),
|
||||
&TreeConfig::default(),
|
||||
|
||||
@@ -63,7 +63,7 @@ impl EngineApiMetrics {
|
||||
pub(crate) fn execute_metered<E, DB>(
|
||||
&self,
|
||||
executor: E,
|
||||
transactions: impl Iterator<Item = Result<impl ExecutableTx<E>, BlockExecutionError>>,
|
||||
mut transactions: impl Iterator<Item = Result<impl ExecutableTx<E>, BlockExecutionError>>,
|
||||
state_hook: Box<dyn OnStateHook>,
|
||||
) -> Result<(BlockExecutionOutput<E::Receipt>, Vec<Address>), BlockExecutionError>
|
||||
where
|
||||
@@ -79,27 +79,42 @@ impl EngineApiMetrics {
|
||||
let mut executor = executor.with_state_hook(Some(Box::new(wrapper)));
|
||||
|
||||
let f = || {
|
||||
let start = Instant::now();
|
||||
debug_span!(target: "engine::tree", "pre execution")
|
||||
.entered()
|
||||
.in_scope(|| executor.apply_pre_execution_changes())?;
|
||||
self.executor.pre_execution_histogram.record(start.elapsed());
|
||||
|
||||
let exec_span = debug_span!(target: "engine::tree", "execution").entered();
|
||||
for tx in transactions {
|
||||
loop {
|
||||
let start = Instant::now();
|
||||
let Some(tx) = transactions.next() else { break };
|
||||
self.executor.transaction_wait_histogram.record(start.elapsed());
|
||||
|
||||
let tx = tx?;
|
||||
senders.push(*tx.signer());
|
||||
|
||||
let span =
|
||||
debug_span!(target: "engine::tree", "execute tx", tx_hash=?tx.tx().tx_hash());
|
||||
let enter = span.entered();
|
||||
trace!(target: "engine::tree", "Executing transaction");
|
||||
senders.push(*tx.signer());
|
||||
let start = Instant::now();
|
||||
let gas_used = executor.execute_transaction(tx)?;
|
||||
self.executor.transaction_execution_histogram.record(start.elapsed());
|
||||
|
||||
// record the tx gas used
|
||||
enter.record("gas_used", gas_used);
|
||||
}
|
||||
drop(exec_span);
|
||||
debug_span!(target: "engine::tree", "finish")
|
||||
|
||||
let start = Instant::now();
|
||||
let result = debug_span!(target: "engine::tree", "finish")
|
||||
.entered()
|
||||
.in_scope(|| executor.finish())
|
||||
.map(|(evm, result)| (evm.into_db(), result))
|
||||
.map(|(evm, result)| (evm.into_db(), result));
|
||||
self.executor.post_execution_histogram.record(start.elapsed());
|
||||
|
||||
result
|
||||
};
|
||||
|
||||
// Use metered to execute and track timing/gas metrics
|
||||
|
||||
@@ -21,6 +21,7 @@ use executor::WorkloadExecutor;
|
||||
use multiproof::{SparseTrieUpdate, *};
|
||||
use parking_lot::RwLock;
|
||||
use prewarm::PrewarmMetrics;
|
||||
use rayon::iter::{ParallelBridge, ParallelIterator};
|
||||
use reth_engine_primitives::ExecutableTxIterator;
|
||||
use reth_evm::{
|
||||
execute::{ExecutableTxFor, WithTxEnv},
|
||||
@@ -40,6 +41,7 @@ use reth_trie_sparse::{
|
||||
};
|
||||
use reth_trie_sparse_parallel::{ParallelSparseTrie, ParallelismThresholds};
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
sync::{
|
||||
atomic::AtomicBool,
|
||||
mpsc::{self, channel},
|
||||
@@ -104,6 +106,8 @@ where
|
||||
cross_block_cache_size: u64,
|
||||
/// Whether transactions should not be executed on prewarming task.
|
||||
disable_transaction_prewarming: bool,
|
||||
/// Whether state cache should be disable
|
||||
disable_state_cache: bool,
|
||||
/// Determines how to configure the evm for execution.
|
||||
evm_config: Evm,
|
||||
/// Whether precompile cache should be disabled.
|
||||
@@ -147,6 +151,7 @@ where
|
||||
cross_block_cache_size: config.cross_block_cache_size(),
|
||||
disable_transaction_prewarming: config.disable_prewarming(),
|
||||
evm_config,
|
||||
disable_state_cache: config.disable_state_cache(),
|
||||
precompile_cache_disabled: config.precompile_cache_disabled(),
|
||||
precompile_cache_map,
|
||||
sparse_state_trie: Arc::default(),
|
||||
@@ -312,21 +317,50 @@ where
|
||||
mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>>,
|
||||
usize,
|
||||
) {
|
||||
let (transactions, convert) = transactions.into();
|
||||
let transactions = transactions.into_iter();
|
||||
// Get the transaction count for prewarming task
|
||||
// Use upper bound if available (more accurate), otherwise use lower bound
|
||||
let (lower, upper) = transactions.size_hint();
|
||||
let transaction_count_hint = upper.unwrap_or(lower);
|
||||
|
||||
// Spawn a task that iterates through all transactions in parallel and sends them to the
|
||||
// main task.
|
||||
let (tx, rx) = mpsc::channel();
|
||||
self.executor.spawn_blocking(move || {
|
||||
transactions.enumerate().par_bridge().for_each_with(tx, |sender, (idx, tx)| {
|
||||
let tx = convert(tx);
|
||||
let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx: Arc::new(tx) });
|
||||
let _ = sender.send((idx, tx));
|
||||
});
|
||||
});
|
||||
|
||||
// Spawn a task that processes out-of-order transactions from the task above and sends them
|
||||
// to prewarming and execution tasks.
|
||||
let (prewarm_tx, prewarm_rx) = mpsc::channel();
|
||||
let (execute_tx, execute_rx) = mpsc::channel();
|
||||
self.executor.spawn_blocking(move || {
|
||||
for tx in transactions {
|
||||
let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx: Arc::new(tx) });
|
||||
let mut next_for_execution = 0;
|
||||
let mut queue = BTreeMap::new();
|
||||
while let Ok((idx, tx)) = rx.recv() {
|
||||
// only send Ok(_) variants to prewarming task
|
||||
if let Ok(tx) = &tx {
|
||||
let _ = prewarm_tx.send(tx.clone());
|
||||
}
|
||||
let _ = execute_tx.send(tx);
|
||||
|
||||
if next_for_execution == idx {
|
||||
let _ = execute_tx.send(tx);
|
||||
next_for_execution += 1;
|
||||
|
||||
while let Some(entry) = queue.first_entry() &&
|
||||
*entry.key() == next_for_execution
|
||||
{
|
||||
let _ = execute_tx.send(entry.remove());
|
||||
next_for_execution += 1;
|
||||
}
|
||||
} else {
|
||||
queue.insert(idx, tx);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@@ -351,9 +385,15 @@ where
|
||||
transactions = mpsc::channel().1;
|
||||
}
|
||||
|
||||
let saved_cache = self.cache_for(env.parent_hash);
|
||||
let cache = saved_cache.cache().clone();
|
||||
let cache_metrics = saved_cache.metrics().clone();
|
||||
let (saved_cache, cache, cache_metrics) = if self.disable_state_cache {
|
||||
(None, None, None)
|
||||
} else {
|
||||
let saved_cache = self.cache_for(env.parent_hash);
|
||||
let cache = saved_cache.cache().clone();
|
||||
let cache_metrics = saved_cache.metrics().clone();
|
||||
(Some(saved_cache), Some(cache), Some(cache_metrics))
|
||||
};
|
||||
|
||||
// configure prewarming
|
||||
let prewarm_ctx = PrewarmContext {
|
||||
env,
|
||||
@@ -565,12 +605,12 @@ impl<Tx, Err> PayloadHandle<Tx, Err> {
|
||||
}
|
||||
|
||||
/// Returns a clone of the caches used by prewarming
|
||||
pub(super) fn caches(&self) -> StateExecutionCache {
|
||||
pub(super) fn caches(&self) -> Option<StateExecutionCache> {
|
||||
self.prewarm_handle.cache.clone()
|
||||
}
|
||||
|
||||
/// Returns a clone of the cache metrics used by prewarming
|
||||
pub(super) fn cache_metrics(&self) -> CachedStateMetrics {
|
||||
pub(super) fn cache_metrics(&self) -> Option<CachedStateMetrics> {
|
||||
self.prewarm_handle.cache_metrics.clone()
|
||||
}
|
||||
|
||||
@@ -600,9 +640,9 @@ impl<Tx, Err> PayloadHandle<Tx, Err> {
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct CacheTaskHandle {
|
||||
/// The shared cache the task operates with.
|
||||
cache: StateExecutionCache,
|
||||
cache: Option<StateExecutionCache>,
|
||||
/// Metrics for the caches
|
||||
cache_metrics: CachedStateMetrics,
|
||||
cache_metrics: Option<CachedStateMetrics>,
|
||||
/// Channel to the spawned prewarm task if any
|
||||
to_prewarm_task: Option<std::sync::mpsc::Sender<PrewarmTaskEvent>>,
|
||||
}
|
||||
@@ -1017,13 +1057,19 @@ mod tests {
|
||||
|
||||
let provider_factory = BlockchainProvider::new(factory).unwrap();
|
||||
|
||||
let mut handle = payload_processor.spawn(
|
||||
Default::default(),
|
||||
core::iter::empty::<Result<Recovered<TransactionSigned>, core::convert::Infallible>>(),
|
||||
StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
|
||||
OverlayStateProviderFactory::new(provider_factory),
|
||||
&TreeConfig::default(),
|
||||
);
|
||||
let mut handle =
|
||||
payload_processor.spawn(
|
||||
Default::default(),
|
||||
(
|
||||
core::iter::empty::<
|
||||
Result<Recovered<TransactionSigned>, core::convert::Infallible>,
|
||||
>(),
|
||||
std::convert::identity,
|
||||
),
|
||||
StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
|
||||
OverlayStateProviderFactory::new(provider_factory),
|
||||
&TreeConfig::default(),
|
||||
);
|
||||
|
||||
let mut state_hook = handle.state_hook();
|
||||
|
||||
|
||||
@@ -29,7 +29,7 @@ use metrics::{Counter, Gauge, Histogram};
|
||||
use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, SpecFor};
|
||||
use reth_metrics::Metrics;
|
||||
use reth_primitives_traits::NodePrimitives;
|
||||
use reth_provider::{BlockReader, StateProviderFactory, StateReader};
|
||||
use reth_provider::{BlockReader, StateProviderBox, StateProviderFactory, StateReader};
|
||||
use reth_revm::{database::StateProviderDatabase, db::BundleState, state::EvmState};
|
||||
use reth_trie::MultiProofTargets;
|
||||
use std::{
|
||||
@@ -255,31 +255,35 @@ where
|
||||
self;
|
||||
let hash = env.hash;
|
||||
|
||||
debug!(target: "engine::caching", parent_hash=?hash, "Updating execution cache");
|
||||
// Perform all cache operations atomically under the lock
|
||||
execution_cache.update_with_guard(|cached| {
|
||||
// consumes the `SavedCache` held by the prewarming task, which releases its usage guard
|
||||
let (caches, cache_metrics) = saved_cache.split();
|
||||
let new_cache = SavedCache::new(hash, caches, cache_metrics);
|
||||
if let Some(saved_cache) = saved_cache {
|
||||
debug!(target: "engine::caching", parent_hash=?hash, "Updating execution cache");
|
||||
// Perform all cache operations atomically under the lock
|
||||
execution_cache.update_with_guard(|cached| {
|
||||
// consumes the `SavedCache` held by the prewarming task, which releases its usage
|
||||
// guard
|
||||
let (caches, cache_metrics) = saved_cache.split();
|
||||
let new_cache = SavedCache::new(hash, caches, cache_metrics);
|
||||
|
||||
// Insert state into cache while holding the lock
|
||||
if new_cache.cache().insert_state(&state).is_err() {
|
||||
// Clear the cache on error to prevent having a polluted cache
|
||||
*cached = None;
|
||||
debug!(target: "engine::caching", "cleared execution cache on update error");
|
||||
return;
|
||||
}
|
||||
// Insert state into cache while holding the lock
|
||||
if new_cache.cache().insert_state(&state).is_err() {
|
||||
// Clear the cache on error to prevent having a polluted cache
|
||||
*cached = None;
|
||||
debug!(target: "engine::caching", "cleared execution cache on update error");
|
||||
return;
|
||||
}
|
||||
|
||||
new_cache.update_metrics();
|
||||
new_cache.update_metrics();
|
||||
|
||||
// Replace the shared cache with the new one; the previous cache (if any) is dropped.
|
||||
*cached = Some(new_cache);
|
||||
});
|
||||
// Replace the shared cache with the new one; the previous cache (if any) is
|
||||
// dropped.
|
||||
*cached = Some(new_cache);
|
||||
});
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
debug!(target: "engine::caching", parent_hash=?hash, elapsed=?elapsed, "Updated execution cache");
|
||||
let elapsed = start.elapsed();
|
||||
debug!(target: "engine::caching", parent_hash=?hash, elapsed=?elapsed, "Updated execution cache");
|
||||
|
||||
metrics.cache_saving_duration.set(elapsed.as_secs_f64());
|
||||
metrics.cache_saving_duration.set(elapsed.as_secs_f64());
|
||||
}
|
||||
}
|
||||
|
||||
/// Executes the task.
|
||||
@@ -356,7 +360,7 @@ where
|
||||
{
|
||||
pub(super) env: ExecutionEnv<Evm>,
|
||||
pub(super) evm_config: Evm,
|
||||
pub(super) saved_cache: SavedCache,
|
||||
pub(super) saved_cache: Option<SavedCache>,
|
||||
/// Provider to obtain the state
|
||||
pub(super) provider: StateProviderBuilder<N, P>,
|
||||
pub(super) metrics: PrewarmMetrics,
|
||||
@@ -400,10 +404,13 @@ where
|
||||
};
|
||||
|
||||
// Use the caches to create a new provider with caching
|
||||
let caches = saved_cache.cache().clone();
|
||||
let cache_metrics = saved_cache.metrics().clone();
|
||||
let state_provider =
|
||||
CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics);
|
||||
let state_provider: StateProviderBox = if let Some(saved_cache) = saved_cache {
|
||||
let caches = saved_cache.cache().clone();
|
||||
let cache_metrics = saved_cache.metrics().clone();
|
||||
Box::new(CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics))
|
||||
} else {
|
||||
state_provider
|
||||
};
|
||||
|
||||
let state_provider = StateProviderDatabase::new(state_provider);
|
||||
|
||||
|
||||
@@ -213,16 +213,31 @@ where
|
||||
Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
|
||||
{
|
||||
match input {
|
||||
BlockOrPayload::Payload(payload) => Ok(Either::Left(
|
||||
self.evm_config
|
||||
BlockOrPayload::Payload(payload) => {
|
||||
let (iter, convert) = self
|
||||
.evm_config
|
||||
.tx_iterator_for_payload(payload)
|
||||
.map_err(NewPayloadError::other)?
|
||||
.map(|res| res.map(Either::Left).map_err(NewPayloadError::other)),
|
||||
)),
|
||||
.into();
|
||||
|
||||
let iter = Either::Left(iter.into_iter().map(Either::Left));
|
||||
let convert = move |tx| {
|
||||
let Either::Left(tx) = tx else { unreachable!() };
|
||||
convert(tx).map(Either::Left).map_err(Either::Left)
|
||||
};
|
||||
|
||||
// Box the closure to satisfy the `Fn` bound both here and in the branch below
|
||||
Ok((iter, Box::new(convert) as Box<dyn Fn(_) -> _ + Send + Sync + 'static>))
|
||||
}
|
||||
BlockOrPayload::Block(block) => {
|
||||
Ok(Either::Right(block.body().clone_transactions().into_iter().map(|tx| {
|
||||
Ok(Either::Right(tx.try_into_recovered().map_err(NewPayloadError::other)?))
|
||||
})))
|
||||
let iter =
|
||||
Either::Right(block.body().clone_transactions().into_iter().map(Either::Right));
|
||||
let convert = move |tx: Either<_, N::SignedTx>| {
|
||||
let Either::Right(tx) = tx else { unreachable!() };
|
||||
tx.try_into_recovered().map(Either::Right).map_err(Either::Right)
|
||||
};
|
||||
|
||||
Ok((iter, Box::new(convert)))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -353,7 +368,7 @@ where
|
||||
)
|
||||
.into())
|
||||
};
|
||||
let state_provider = ensure_ok!(provider_builder.build());
|
||||
let mut state_provider = ensure_ok!(provider_builder.build());
|
||||
drop(_enter);
|
||||
|
||||
// fetch parent block
|
||||
@@ -396,11 +411,13 @@ where
|
||||
|
||||
// Use cached state provider before executing, used in execution after prewarming threads
|
||||
// complete
|
||||
let state_provider = CachedStateProvider::new_with_caches(
|
||||
state_provider,
|
||||
handle.caches(),
|
||||
handle.cache_metrics(),
|
||||
);
|
||||
if let Some((caches, cache_metrics)) = handle.caches().zip(handle.cache_metrics()) {
|
||||
state_provider = Box::new(CachedStateProvider::new_with_caches(
|
||||
state_provider,
|
||||
caches,
|
||||
cache_metrics,
|
||||
));
|
||||
};
|
||||
|
||||
// Execute the block and handle any execution errors
|
||||
let (output, senders) = match if self.config.state_provider_metrics() {
|
||||
|
||||
@@ -289,12 +289,15 @@ where
|
||||
&self,
|
||||
payload: &ExecutionData,
|
||||
) -> Result<impl ExecutableTxIterator<Self>, Self::Error> {
|
||||
Ok(payload.payload.transactions().clone().into_iter().map(|tx| {
|
||||
let txs = payload.payload.transactions().clone().into_iter();
|
||||
let convert = |tx: Bytes| {
|
||||
let tx =
|
||||
TxTy::<Self::Primitives>::decode_2718_exact(tx.as_ref()).map_err(AnyError::new)?;
|
||||
let signer = tx.try_recover().map_err(AnyError::new)?;
|
||||
Ok::<_, AnyError>(tx.with_signer(signer))
|
||||
}))
|
||||
};
|
||||
|
||||
Ok((txs, convert))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -18,22 +18,51 @@ pub trait ConfigureEngineEvm<ExecutionData>: ConfigureEvm {
|
||||
) -> Result<impl ExecutableTxIterator<Self>, Self::Error>;
|
||||
}
|
||||
|
||||
/// Iterator over executable transactions.
|
||||
pub trait ExecutableTxIterator<Evm: ConfigureEvm>:
|
||||
Iterator<Item = Result<Self::Tx, Self::Error>> + Send + 'static
|
||||
{
|
||||
/// A helper trait representing a pair of a "raw" transactions iterator and a closure that can be
|
||||
/// used to convert them to an executable transaction. This tuple is used in the engine to
|
||||
/// parallelize heavy work like decoding or recovery.
|
||||
pub trait ExecutableTxTuple: Into<(Self::Iter, Self::Convert)> + Send + 'static {
|
||||
/// Raw transaction that can be converted to an [`ExecutableTxTuple::Tx`]
|
||||
///
|
||||
/// This can be any type that can be converted to an [`ExecutableTxTuple::Tx`]. For example,
|
||||
/// an unrecovered transaction or just the transaction bytes.
|
||||
type RawTx: Send + Sync + 'static;
|
||||
/// The executable transaction type iterator yields.
|
||||
type Tx: ExecutableTxFor<Evm> + Clone + Send + Sync + 'static;
|
||||
type Tx: Clone + Send + Sync + 'static;
|
||||
/// Errors that may occur while recovering or decoding transactions.
|
||||
type Error: core::error::Error + Send + Sync + 'static;
|
||||
|
||||
/// Iterator over [`ExecutableTxTuple::Tx`]
|
||||
type Iter: Iterator<Item = Self::RawTx> + Send + 'static;
|
||||
/// Closure that can be used to convert a [`ExecutableTxTuple::RawTx`] to a
|
||||
/// [`ExecutableTxTuple::Tx`]. This might involve heavy work like decoding or recovery
|
||||
/// and will be parallelized in the engine.
|
||||
type Convert: Fn(Self::RawTx) -> Result<Self::Tx, Self::Error> + Send + Sync + 'static;
|
||||
}
|
||||
|
||||
impl<Evm: ConfigureEvm, Tx, Err, T> ExecutableTxIterator<Evm> for T
|
||||
impl<RawTx, Tx, Err, I, F> ExecutableTxTuple for (I, F)
|
||||
where
|
||||
Tx: ExecutableTxFor<Evm> + Clone + Send + Sync + 'static,
|
||||
RawTx: Send + Sync + 'static,
|
||||
Tx: Clone + Send + Sync + 'static,
|
||||
Err: core::error::Error + Send + Sync + 'static,
|
||||
T: Iterator<Item = Result<Tx, Err>> + Send + 'static,
|
||||
I: Iterator<Item = RawTx> + Send + 'static,
|
||||
F: Fn(RawTx) -> Result<Tx, Err> + Send + Sync + 'static,
|
||||
{
|
||||
type RawTx = RawTx;
|
||||
type Tx = Tx;
|
||||
type Error = Err;
|
||||
|
||||
type Iter = I;
|
||||
type Convert = F;
|
||||
}
|
||||
|
||||
/// Iterator over executable transactions.
|
||||
pub trait ExecutableTxIterator<Evm: ConfigureEvm>:
|
||||
ExecutableTxTuple<Tx: ExecutableTxFor<Evm>>
|
||||
{
|
||||
}
|
||||
|
||||
impl<T, Evm: ConfigureEvm> ExecutableTxIterator<Evm> for T where
|
||||
T: ExecutableTxTuple<Tx: ExecutableTxFor<Evm>>
|
||||
{
|
||||
}
|
||||
|
||||
@@ -17,6 +17,14 @@ pub struct ExecutorMetrics {
|
||||
/// The Histogram for amount of gas used.
|
||||
pub gas_used_histogram: Histogram,
|
||||
|
||||
/// The Histogram for amount of time taken to execute the pre-execution changes.
|
||||
pub pre_execution_histogram: Histogram,
|
||||
/// The Histogram for amount of time taken to wait for one transaction to be available.
|
||||
pub transaction_wait_histogram: Histogram,
|
||||
/// The Histogram for amount of time taken to execute one transaction.
|
||||
pub transaction_execution_histogram: Histogram,
|
||||
/// The Histogram for amount of time taken to execute the post-execution changes.
|
||||
pub post_execution_histogram: Histogram,
|
||||
/// The Histogram for amount of time taken to execute blocks.
|
||||
pub execution_histogram: Histogram,
|
||||
/// The total amount of time it took to execute the latest block.
|
||||
|
||||
@@ -35,6 +35,10 @@ pub struct EngineArgs {
|
||||
#[deprecated]
|
||||
pub caching_and_prewarming_enabled: bool,
|
||||
|
||||
/// Disable state cache
|
||||
#[arg(long = "engine.disable-state-cache")]
|
||||
pub state_cache_disabled: bool,
|
||||
|
||||
/// Disable parallel prewarming
|
||||
#[arg(long = "engine.disable-prewarming", alias = "engine.disable-caching-and-prewarming")]
|
||||
pub prewarming_disabled: bool,
|
||||
@@ -130,6 +134,7 @@ impl Default for EngineArgs {
|
||||
legacy_state_root_task_enabled: false,
|
||||
state_root_task_compare_updates: false,
|
||||
caching_and_prewarming_enabled: true,
|
||||
state_cache_disabled: false,
|
||||
prewarming_disabled: false,
|
||||
parallel_sparse_trie_enabled: true,
|
||||
parallel_sparse_trie_disabled: false,
|
||||
@@ -157,6 +162,7 @@ impl EngineArgs {
|
||||
.with_persistence_threshold(self.persistence_threshold)
|
||||
.with_memory_block_buffer_target(self.memory_block_buffer_target)
|
||||
.with_legacy_state_root(self.legacy_state_root_task_enabled)
|
||||
.without_state_cache(self.state_cache_disabled)
|
||||
.without_prewarming(self.prewarming_disabled)
|
||||
.with_disable_parallel_sparse_trie(self.parallel_sparse_trie_disabled)
|
||||
.with_state_provider_metrics(self.state_provider_metrics)
|
||||
|
||||
@@ -16,7 +16,7 @@ use alloy_consensus::{BlockHeader, Header};
|
||||
use alloy_eips::Decodable2718;
|
||||
use alloy_evm::{EvmFactory, FromRecoveredTx, FromTxWithEncoded};
|
||||
use alloy_op_evm::block::{receipt_builder::OpReceiptBuilder, OpTxEnv};
|
||||
use alloy_primitives::U256;
|
||||
use alloy_primitives::{Bytes, U256};
|
||||
use core::fmt::Debug;
|
||||
use op_alloy_consensus::EIP1559ParamError;
|
||||
use op_alloy_rpc_types_engine::OpExecutionData;
|
||||
@@ -265,12 +265,15 @@ where
|
||||
&self,
|
||||
payload: &OpExecutionData,
|
||||
) -> Result<impl ExecutableTxIterator<Self>, Self::Error> {
|
||||
Ok(payload.payload.transactions().clone().into_iter().map(|encoded| {
|
||||
let transactions = payload.payload.transactions().clone().into_iter();
|
||||
let convert = |encoded: Bytes| {
|
||||
let tx = TxTy::<Self::Primitives>::decode_2718_exact(encoded.as_ref())
|
||||
.map_err(AnyError::new)?;
|
||||
let signer = tx.try_recover().map_err(AnyError::new)?;
|
||||
Ok::<_, AnyError>(WithEncoded::new(encoded, tx.with_signer(signer)))
|
||||
}))
|
||||
};
|
||||
|
||||
Ok((transactions, convert))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -885,6 +885,9 @@ Engine:
|
||||
--engine.legacy-state-root
|
||||
Enable legacy state root
|
||||
|
||||
--engine.disable-state-cache
|
||||
Disable state cache
|
||||
|
||||
--engine.disable-prewarming
|
||||
Disable parallel prewarming
|
||||
|
||||
|
||||
@@ -885,6 +885,9 @@ Engine:
|
||||
--engine.legacy-state-root
|
||||
Enable legacy state root
|
||||
|
||||
--engine.disable-state-cache
|
||||
Disable state cache
|
||||
|
||||
--engine.disable-prewarming
|
||||
Disable parallel prewarming
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ use reth_op::{
|
||||
primitives::SignedTransaction,
|
||||
};
|
||||
use reth_rpc_api::eth::helpers::pending_block::BuildPendingEnv;
|
||||
use revm_primitives::Bytes;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -126,13 +127,15 @@ impl ConfigureEngineEvm<CustomExecutionData> for CustomEvmConfig {
|
||||
&self,
|
||||
payload: &CustomExecutionData,
|
||||
) -> Result<impl ExecutableTxIterator<Self>, Self::Error> {
|
||||
Ok(payload.inner.payload.transactions().clone().into_iter().map(|encoded| {
|
||||
let transactions = payload.inner.payload.transactions().clone().into_iter();
|
||||
let convert = |encoded: Bytes| {
|
||||
let tx = CustomTransaction::decode_2718_exact(encoded.as_ref())
|
||||
.map_err(Into::into)
|
||||
.map_err(PayloadError::Decode)?;
|
||||
let signer = tx.try_recover().map_err(NewPayloadError::other)?;
|
||||
Ok::<_, NewPayloadError>(WithEncoded::new(encoded, tx.with_signer(signer)))
|
||||
}))
|
||||
};
|
||||
Ok((transactions, convert))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user