merge main

This commit is contained in:
Soubhik-10
2025-10-01 18:46:52 +05:30
34 changed files with 1201 additions and 656 deletions

512
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,5 @@
[workspace.package]
version = "1.8.1"
version = "1.8.2"
edition = "2024"
rust-version = "1.88"
license = "MIT OR Apache-2.0"
@@ -487,33 +487,33 @@ alloy-trie = { version = "0.9.1", default-features = false }
alloy-hardforks = "0.3.5"
alloy-consensus = { version = "1.0.35", default-features = false }
alloy-contract = { version = "1.0.35", default-features = false }
alloy-eips = { version = "1.0.35", default-features = false }
alloy-genesis = { version = "1.0.35", default-features = false }
alloy-json-rpc = { version = "1.0.35", default-features = false }
alloy-network = { version = "1.0.35", default-features = false }
alloy-network-primitives = { version = "1.0.35", default-features = false }
alloy-provider = { version = "1.0.35", features = ["reqwest"], default-features = false }
alloy-pubsub = { version = "1.0.35", default-features = false }
alloy-rpc-client = { version = "1.0.35", default-features = false }
alloy-rpc-types = { version = "1.0.35", features = ["eth"], default-features = false }
alloy-rpc-types-admin = { version = "1.0.35", default-features = false }
alloy-rpc-types-anvil = { version = "1.0.35", default-features = false }
alloy-rpc-types-beacon = { version = "1.0.35", default-features = false }
alloy-rpc-types-debug = { version = "1.0.35", default-features = false }
alloy-rpc-types-engine = { version = "1.0.35", default-features = false }
alloy-rpc-types-eth = { version = "1.0.35", default-features = false }
alloy-rpc-types-mev = { version = "1.0.35", default-features = false }
alloy-rpc-types-trace = { version = "1.0.35", default-features = false }
alloy-rpc-types-txpool = { version = "1.0.35", default-features = false }
alloy-serde = { version = "1.0.35", default-features = false }
alloy-signer = { version = "1.0.35", default-features = false }
alloy-signer-local = { version = "1.0.35", default-features = false }
alloy-transport = { version = "1.0.35" }
alloy-transport-http = { version = "1.0.35", features = ["reqwest-rustls-tls"], default-features = false }
alloy-transport-ipc = { version = "1.0.35", default-features = false }
alloy-transport-ws = { version = "1.0.35", default-features = false }
alloy-consensus = { version = "1.0.37", default-features = false }
alloy-contract = { version = "1.0.37", default-features = false }
alloy-eips = { version = "1.0.37", default-features = false }
alloy-genesis = { version = "1.0.37", default-features = false }
alloy-json-rpc = { version = "1.0.37", default-features = false }
alloy-network = { version = "1.0.37", default-features = false }
alloy-network-primitives = { version = "1.0.37", default-features = false }
alloy-provider = { version = "1.0.37", features = ["reqwest"], default-features = false }
alloy-pubsub = { version = "1.0.37", default-features = false }
alloy-rpc-client = { version = "1.0.37", default-features = false }
alloy-rpc-types = { version = "1.0.37", features = ["eth"], default-features = false }
alloy-rpc-types-admin = { version = "1.0.37", default-features = false }
alloy-rpc-types-anvil = { version = "1.0.37", default-features = false }
alloy-rpc-types-beacon = { version = "1.0.37", default-features = false }
alloy-rpc-types-debug = { version = "1.0.37", default-features = false }
alloy-rpc-types-engine = { version = "1.0.37", default-features = false }
alloy-rpc-types-eth = { version = "1.0.37", default-features = false }
alloy-rpc-types-mev = { version = "1.0.37", default-features = false }
alloy-rpc-types-trace = { version = "1.0.37", default-features = false }
alloy-rpc-types-txpool = { version = "1.0.37", default-features = false }
alloy-serde = { version = "1.0.37", default-features = false }
alloy-signer = { version = "1.0.37", default-features = false }
alloy-signer-local = { version = "1.0.37", default-features = false }
alloy-transport = { version = "1.0.37" }
alloy-transport-http = { version = "1.0.37", features = ["reqwest-rustls-tls"], default-features = false }
alloy-transport-ipc = { version = "1.0.37", default-features = false }
alloy-transport-ws = { version = "1.0.37", default-features = false }
# op
alloy-op-evm = { version = "0.21.0", default-features = false }

View File

@@ -66,6 +66,12 @@ impl<F> FnLauncher<F> {
}
}
impl<F> fmt::Debug for FnLauncher<F> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FnLauncher").field("func", &"<function>").finish()
}
}
impl<C, Ext, F> Launcher<C, Ext> for FnLauncher<F>
where
C: ChainSpecParser,

View File

@@ -82,11 +82,6 @@ impl<I> Setup<I>
where
I: EngineTypes,
{
/// Create a new setup with default values
pub fn new() -> Self {
Self::default()
}
/// Set the chain specification
pub fn with_chain_spec(mut self, chain_spec: Arc<ChainSpec>) -> Self {
self.chain_spec = Some(chain_spec);

View File

@@ -194,7 +194,7 @@ where
/// through newPayload.
async fn advance(&mut self) -> eyre::Result<()> {
let timestamp = std::cmp::max(
self.last_timestamp + 1,
self.last_timestamp.saturating_add(1),
std::time::SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("cannot be earlier than UNIX_EPOCH")

View File

@@ -260,7 +260,7 @@ where
}
}
/// Spawn cache prewarming exclusively.
/// Spawns a task that exclusively handles cache prewarming for transaction execution.
///
/// Returns a [`PayloadHandle`] to communicate with the task.
pub(super) fn spawn_cache_exclusive<P, I: ExecutableTxIterator<Evm>>(

View File

@@ -452,7 +452,7 @@ where
"Starting dedicated storage proof calculation",
);
let start = Instant::now();
let result = ParallelProof::new(
let proof_result = ParallelProof::new(
config.consistent_view,
config.nodes_sorted,
config.state_sorted,
@@ -461,7 +461,7 @@ where
)
.with_branch_node_masks(true)
.with_multi_added_removed_keys(Some(multi_added_removed_keys))
.decoded_storage_proof(hashed_address, proof_targets);
.storage_proof(hashed_address, proof_targets);
let elapsed = start.elapsed();
trace!(
target: "engine::root",
@@ -472,7 +472,7 @@ where
"Storage multiproofs calculated",
);
match result {
match proof_result {
Ok(proof) => {
let _ = state_root_message_sender.send(MultiProofMessage::ProofCalculated(
Box::new(ProofCalculated {
@@ -527,7 +527,7 @@ where
);
let start = Instant::now();
let result = ParallelProof::new(
let proof_result = ParallelProof::new(
config.consistent_view,
config.nodes_sorted,
config.state_sorted,
@@ -548,7 +548,7 @@ where
"Multiproof calculated",
);
match result {
match proof_result {
Ok(proof) => {
let _ = state_root_message_sender.send(MultiProofMessage::ProofCalculated(
Box::new(ProofCalculated {

View File

@@ -329,6 +329,7 @@ where
Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
{
/// A helper macro that returns the block in case there was an error
/// This macro is used for early returns before block conversion
macro_rules! ensure_ok {
($expr:expr) => {
match $expr {
@@ -343,6 +344,20 @@ where
};
}
/// A helper macro for handling errors after the input has been converted to a block
macro_rules! ensure_ok_post_block {
($expr:expr, $block:expr) => {
match $expr {
Ok(val) => val,
Err(e) => {
return Err(
InsertBlockError::new($block.into_sealed_block(), e.into()).into()
)
}
}
};
}
let parent_hash = input.parent_hash();
let block_num_hash = input.num_hash();
@@ -374,105 +389,35 @@ where
let env = ExecutionEnv { evm_env, hash: input.hash(), parent_hash: input.parent_hash() };
// We only run the parallel state root if we are not currently persisting any blocks or
// persisting blocks that are all ancestors of the one we are executing.
//
// If we're committing ancestor blocks, then: any trie updates being committed are a subset
// of the in-memory trie updates collected before fetching reverts. So any diff in
// reverts (pre vs post commit) is already covered by the in-memory trie updates we
// collect in `compute_state_root_parallel`.
//
// See https://github.com/paradigmxyz/reth/issues/12688 for more details
let persisting_kind = ctx.persisting_kind_for(input.block_with_parent());
// don't run parallel if state root fallback is set
let run_parallel_state_root =
persisting_kind.can_run_parallel_state_root() && !self.config.state_root_fallback();
// Use state root task only if:
// 1. No persistence is in progress
// 2. Config allows it
// 3. No ancestors with missing trie updates. If any exist, it will mean that every state
// root task proof calculation will include a lot of unrelated paths in the prefix sets.
// It's cheaper to run a parallel state root that does one walk over trie tables while
// accounting for the prefix sets.
// Plan the strategy used for state root computation.
let state_root_plan = self.plan_state_root_computation(&input, &ctx);
let persisting_kind = state_root_plan.persisting_kind;
let has_ancestors_with_missing_trie_updates =
self.has_ancestors_with_missing_trie_updates(input.block_with_parent(), ctx.state());
let mut use_state_root_task = run_parallel_state_root &&
self.config.use_state_root_task() &&
!has_ancestors_with_missing_trie_updates;
state_root_plan.has_ancestors_with_missing_trie_updates;
let strategy = state_root_plan.strategy;
debug!(
target: "engine::tree",
block=?block_num_hash,
run_parallel_state_root,
has_ancestors_with_missing_trie_updates,
use_state_root_task,
config_allows_state_root_task=self.config.use_state_root_task(),
?strategy,
?has_ancestors_with_missing_trie_updates,
"Deciding which state root algorithm to run"
);
// use prewarming background task
let txs = self.tx_iterator_for(&input)?;
let mut handle = if use_state_root_task {
// use background tasks for state root calc
let consistent_view =
ensure_ok!(ConsistentDbView::new_with_latest_tip(self.provider.clone()));
// get allocated trie input if it exists
let allocated_trie_input = self.payload_processor.take_trie_input();
// Compute trie input
let trie_input_start = Instant::now();
let trie_input = ensure_ok!(self.compute_trie_input(
persisting_kind,
ensure_ok!(consistent_view.provider_ro()),
parent_hash,
ctx.state(),
allocated_trie_input,
));
self.metrics
.block_validation
.trie_input_duration
.record(trie_input_start.elapsed().as_secs_f64());
// Use state root task only if prefix sets are empty, otherwise proof generation is too
// expensive because it requires walking over the paths in the prefix set in every
// proof.
let spawn_payload_processor_start = Instant::now();
let handle = if trie_input.prefix_sets.is_empty() {
self.payload_processor.spawn(
env.clone(),
txs,
provider_builder,
consistent_view,
trie_input,
&self.config,
)
} else {
debug!(target: "engine::tree", block=?block_num_hash, "Disabling state root task due to non-empty prefix sets");
use_state_root_task = false;
self.payload_processor.spawn_cache_exclusive(env.clone(), txs, provider_builder)
};
// record prewarming initialization duration
self.metrics
.block_validation
.spawn_payload_processor
.record(spawn_payload_processor_start.elapsed().as_secs_f64());
handle
} else {
let prewarming_start = Instant::now();
let handle =
self.payload_processor.spawn_cache_exclusive(env.clone(), txs, provider_builder);
// Record prewarming initialization duration
self.metrics
.block_validation
.spawn_payload_processor
.record(prewarming_start.elapsed().as_secs_f64());
handle
};
// Spawn the appropriate processor based on strategy
let (mut handle, strategy) = ensure_ok!(self.spawn_payload_processor(
env.clone(),
txs,
provider_builder,
persisting_kind,
parent_hash,
ctx.state(),
block_num_hash,
strategy,
));
// Use cached state provider before executing, used in execution after prewarming threads
// complete
@@ -500,70 +445,10 @@ where
let block = self.convert_to_block(input)?;
// if let (Some(executed_bal), Some(block_bal)) =
// (output.result.block_access_list.as_ref(), block.body().block_access_list())
// {
// if !alloy_evm::eth::utils::validate_block_access_list_against_execution(block_bal) ||
// block_bal.as_slice() != executed_bal.as_slice()
// {
// tracing::debug!(
// "BlockAccessList mismatch!\n block BAL = {:?}\n executed BAL = {:?}",
// block_bal,
// executed_bal
// );
// return Err(InsertBlockError::new(
// block.into_sealed_block(),
// ConsensusError::BlockAccessListMismatch.into(),
// )
// .into());
// }
// }
// A helper macro that returns the block in case there was an error
macro_rules! ensure_ok {
($expr:expr) => {
match $expr {
Ok(val) => val,
Err(e) => return Err(InsertBlockError::new(block.into_sealed_block(), e.into()).into()),
}
};
}
let post_execution_start = Instant::now();
trace!(target: "engine::tree", block=?block_num_hash, "Validating block consensus");
// validate block consensus rules
ensure_ok!(self.validate_block_inner(&block));
// now validate against the parent
if let Err(e) =
self.consensus.validate_header_against_parent(block.sealed_header(), &parent_block)
{
warn!(target: "engine::tree", ?block, "Failed to validate header {} against parent: {e}", block.hash());
return Err(InsertBlockError::new(block.into_sealed_block(), e.into()).into())
}
if let Err(err) = self.consensus.validate_block_post_execution(&block, &output) {
// call post-block hook
self.on_invalid_block(&parent_block, &block, &output, None, ctx.state_mut());
return Err(InsertBlockError::new(block.into_sealed_block(), err.into()).into())
}
let hashed_state = self.provider.hashed_post_state(&output.state);
if let Err(err) =
self.validator.validate_block_post_execution_with_hashed_state(&hashed_state, &block)
{
// call post-block hook
self.on_invalid_block(&parent_block, &block, &output, None, ctx.state_mut());
return Err(InsertBlockError::new(block.into_sealed_block(), err.into()).into())
}
// record post-execution validation duration
self.metrics
.block_validation
.post_execution_validation_duration
.record(post_execution_start.elapsed().as_secs_f64());
let hashed_state = ensure_ok_post_block!(
self.validate_post_execution(&block, &parent_block, &output, &mut ctx),
block
);
debug!(target: "engine::tree", block=?block_num_hash, "Calculating block state root");
@@ -571,10 +456,8 @@ where
let mut maybe_state_root = None;
if run_parallel_state_root {
// if we new payload extends the current canonical change we attempt to use the
// background task or try to compute it in parallel
if use_state_root_task {
match strategy {
StateRootStrategy::StateRootTask => {
debug!(target: "engine::tree", block=?block_num_hash, "Using sparse trie state root algorithm");
match handle.state_root() {
Ok(StateRootComputeOutcome { state_root, trie_updates }) => {
@@ -596,7 +479,8 @@ where
debug!(target: "engine::tree", %error, "State root task failed");
}
}
} else {
}
StateRootStrategy::Parallel => {
debug!(target: "engine::tree", block=?block_num_hash, "Using parallel state root algorithm");
match self.compute_state_root_parallel(
persisting_kind,
@@ -618,8 +502,12 @@ where
}
}
}
StateRootStrategy::Synchronous => {}
}
// Determine the state root.
// If the state root was computed in parallel, we use it.
// Otherwise, we fall back to computing it synchronously.
let (state_root, trie_output, root_elapsed) = if let Some(maybe_state_root) =
maybe_state_root
{
@@ -633,8 +521,10 @@ where
self.metrics.block_validation.state_root_parallel_fallback_total.increment(1);
}
let (root, updates) =
ensure_ok!(state_provider.state_root_with_updates(hashed_state.clone()));
let (root, updates) = ensure_ok_post_block!(
state_provider.state_root_with_updates(hashed_state.clone()),
block
);
(root, updates, root_time.elapsed())
};
@@ -673,7 +563,7 @@ where
//
// Instead, they will be recomputed on persistence.
let connects_to_last_persisted =
ensure_ok!(self.block_connects_to_last_persisted(ctx, &block));
ensure_ok_post_block!(self.block_connects_to_last_persisted(ctx, &block), block);
let should_discard_trie_updates =
!connects_to_last_persisted || has_ancestors_with_missing_trie_updates;
debug!(
@@ -869,6 +759,170 @@ where
Ok(connects)
}
/// Validates the block after execution.
///
/// This performs:
/// - parent header validation
/// - post-execution consensus validation
/// - state-root based post-execution validation
fn validate_post_execution<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
&self,
block: &RecoveredBlock<N::Block>,
parent_block: &SealedHeader<N::BlockHeader>,
output: &BlockExecutionOutput<N::Receipt>,
ctx: &mut TreeCtx<'_, N>,
) -> Result<HashedPostState, InsertBlockErrorKind>
where
V: PayloadValidator<T, Block = N::Block>,
{
let start = Instant::now();
trace!(target: "engine::tree", block=?block.num_hash(), "Validating block consensus");
// validate block consensus rules
if let Err(e) = self.validate_block_inner(block) {
return Err(e.into())
}
// now validate against the parent
if let Err(e) =
self.consensus.validate_header_against_parent(block.sealed_header(), parent_block)
{
warn!(target: "engine::tree", ?block, "Failed to validate header {} against parent: {e}", block.hash());
return Err(e.into())
}
if let Err(err) = self.consensus.validate_block_post_execution(block, output) {
// call post-block hook
self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
return Err(err.into())
}
let hashed_state = self.provider.hashed_post_state(&output.state);
if let Err(err) =
self.validator.validate_block_post_execution_with_hashed_state(&hashed_state, block)
{
// call post-block hook
self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
return Err(err.into())
}
// record post-execution validation duration
self.metrics
.block_validation
.post_execution_validation_duration
.record(start.elapsed().as_secs_f64());
Ok(hashed_state)
}
/// Spawns a payload processor task based on the state root strategy.
///
/// This method determines how to execute the block and compute its state root based on
/// the selected strategy:
/// - `StateRootTask`: Uses a dedicated task for state root computation with proof generation
/// - `Parallel`: Computes state root in parallel with block execution
/// - `Synchronous`: Falls back to sequential execution and state root computation
///
/// The method handles strategy fallbacks if the preferred approach fails, ensuring
/// block execution always completes with a valid state root.
#[allow(clippy::too_many_arguments)]
fn spawn_payload_processor<T: ExecutableTxIterator<Evm>>(
&mut self,
env: ExecutionEnv<Evm>,
txs: T,
provider_builder: StateProviderBuilder<N, P>,
persisting_kind: PersistingKind,
parent_hash: B256,
state: &EngineApiTreeState<N>,
block_num_hash: NumHash,
strategy: StateRootStrategy,
) -> Result<
(
PayloadHandle<
impl ExecutableTxFor<Evm> + use<N, P, Evm, V, T>,
impl core::error::Error + Send + Sync + 'static + use<N, P, Evm, V, T>,
>,
StateRootStrategy,
),
InsertBlockErrorKind,
> {
match strategy {
StateRootStrategy::StateRootTask => {
// use background tasks for state root calc
let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
// get allocated trie input if it exists
let allocated_trie_input = self.payload_processor.take_trie_input();
// Compute trie input
let trie_input_start = Instant::now();
let trie_input = self.compute_trie_input(
persisting_kind,
consistent_view.provider_ro()?,
parent_hash,
state,
allocated_trie_input,
)?;
self.metrics
.block_validation
.trie_input_duration
.record(trie_input_start.elapsed().as_secs_f64());
// Use state root task only if prefix sets are empty, otherwise proof generation is
// too expensive because it requires walking all paths in every proof.
let spawn_start = Instant::now();
let (handle, strategy) = if trie_input.prefix_sets.is_empty() {
(
self.payload_processor.spawn(
env,
txs,
provider_builder,
consistent_view,
trie_input,
&self.config,
),
StateRootStrategy::StateRootTask,
)
// if prefix sets are not empty, we spawn a task that exclusively handles cache
// prewarming for transaction execution
} else {
debug!(
target: "engine::tree",
block=?block_num_hash,
"Disabling state root task due to non-empty prefix sets"
);
(
self.payload_processor.spawn_cache_exclusive(env, txs, provider_builder),
StateRootStrategy::Parallel,
)
};
// record prewarming initialization duration
self.metrics
.block_validation
.spawn_payload_processor
.record(spawn_start.elapsed().as_secs_f64());
Ok((handle, strategy))
}
strategy @ (StateRootStrategy::Parallel | StateRootStrategy::Synchronous) => {
let start = Instant::now();
let handle =
self.payload_processor.spawn_cache_exclusive(env, txs, provider_builder);
// Record prewarming initialization duration
self.metrics
.block_validation
.spawn_payload_processor
.record(start.elapsed().as_secs_f64());
Ok((handle, strategy))
}
}
}
/// Check if the given block has any ancestors with missing trie updates.
fn has_ancestors_with_missing_trie_updates(
&self,
@@ -921,6 +975,58 @@ where
Ok(None)
}
/// Determines the state root computation strategy based on persistence state and configuration.
fn plan_state_root_computation<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
&self,
input: &BlockOrPayload<T>,
ctx: &TreeCtx<'_, N>,
) -> StateRootPlan {
// We only run the parallel state root if we are not currently persisting any blocks or
// persisting blocks that are all ancestors of the one we are executing.
//
// If we're committing ancestor blocks, then: any trie updates being committed are a subset
// of the in-memory trie updates collected before fetching reverts. So any diff in
// reverts (pre vs post commit) is already covered by the in-memory trie updates we
// collect in `compute_state_root_parallel`.
//
// See https://github.com/paradigmxyz/reth/issues/12688 for more details
let persisting_kind = ctx.persisting_kind_for(input.block_with_parent());
let can_run_parallel =
persisting_kind.can_run_parallel_state_root() && !self.config.state_root_fallback();
// Check for ancestors with missing trie updates
let has_ancestors_with_missing_trie_updates =
self.has_ancestors_with_missing_trie_updates(input.block_with_parent(), ctx.state());
// Decide on the strategy.
// Use state root task only if:
// 1. No persistence is in progress
// 2. Config allows it
// 3. No ancestors with missing trie updates. If any exist, it will mean that every state
// root task proof calculation will include a lot of unrelated paths in the prefix sets.
// It's cheaper to run a parallel state root that does one walk over trie tables while
// accounting for the prefix sets.
let strategy = if can_run_parallel {
if self.config.use_state_root_task() && !has_ancestors_with_missing_trie_updates {
StateRootStrategy::StateRootTask
} else {
StateRootStrategy::Parallel
}
} else {
StateRootStrategy::Synchronous
};
debug!(
target: "engine::tree",
block=?input.num_hash(),
?strategy,
has_ancestors_with_missing_trie_updates,
"Planned state root computation strategy"
);
StateRootPlan { strategy, has_ancestors_with_missing_trie_updates, persisting_kind }
}
/// Called when an invalid block is encountered during validation.
fn on_invalid_block(
&self,
@@ -1045,6 +1151,27 @@ where
pub type ValidationOutcome<N, E = InsertPayloadError<BlockTy<N>>> =
Result<ExecutedBlockWithTrieUpdates<N>, E>;
/// Strategy describing how to compute the state root.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum StateRootStrategy {
/// Use the state root task (background sparse trie computation).
StateRootTask,
/// Run the parallel state root computation on the calling thread.
Parallel,
/// Fall back to synchronous computation via the state provider.
Synchronous,
}
/// State root computation plan that captures strategy and required data.
struct StateRootPlan {
/// Strategy that should be attempted for computing the state root.
strategy: StateRootStrategy,
/// Whether ancestors have missing trie updates.
has_ancestors_with_missing_trie_updates: bool,
/// The persisting kind for this block.
persisting_kind: PersistingKind,
}
/// Type that validates the payloads processed by the engine.
///
/// This provides the necessary functions for validating/executing payloads/blocks.

View File

@@ -1,5 +1,11 @@
use super::*;
use crate::persistence::PersistenceAction;
use crate::{
persistence::PersistenceAction,
tree::{
payload_validator::{BasicEngineValidator, TreeCtx, ValidationOutcome},
TreeConfig,
},
};
use alloy_consensus::Header;
use alloy_eips::eip1898::BlockWithParent;
use alloy_primitives::{
@@ -24,7 +30,10 @@ use reth_trie::HashedPostState;
use std::{
collections::BTreeMap,
str::FromStr,
sync::mpsc::{channel, Sender},
sync::{
mpsc::{channel, Receiver, Sender},
Arc,
},
};
use tokio::sync::oneshot;
@@ -338,6 +347,143 @@ impl TestHarness {
}
}
/// Simplified test metrics for validation calls
#[derive(Debug, Default)]
struct TestMetrics {
/// Count of successful `validate_block_direct` calls
validation_calls: usize,
/// Count of validation errors
validation_errors: usize,
}
impl TestMetrics {
fn record_validation(&mut self, success: bool) {
if success {
self.validation_calls += 1;
} else {
self.validation_errors += 1;
}
}
fn total_calls(&self) -> usize {
self.validation_calls + self.validation_errors
}
}
/// Extended test harness with direct `validate_block_with_state` access
pub(crate) struct ValidatorTestHarness {
/// Basic test harness
harness: TestHarness,
/// Direct access to validator for `validate_block_with_state` calls
validator: BasicEngineValidator<MockEthProvider, MockEvmConfig, MockEngineValidator>,
/// Simple validation metrics
metrics: TestMetrics,
}
impl ValidatorTestHarness {
fn new(chain_spec: Arc<ChainSpec>) -> Self {
let harness = TestHarness::new(chain_spec.clone());
// Create validator identical to the one in TestHarness
let consensus = Arc::new(EthBeaconConsensus::new(chain_spec));
let provider = harness.provider.clone();
let payload_validator = MockEngineValidator;
let evm_config = MockEvmConfig::default();
let validator = BasicEngineValidator::new(
provider,
consensus,
evm_config,
payload_validator,
TreeConfig::default(),
Box::new(NoopInvalidBlockHook::default()),
);
Self { harness, validator, metrics: TestMetrics::default() }
}
/// Configure `PersistenceState` for specific `PersistingKind` scenarios
fn start_persistence_operation(&mut self, action: CurrentPersistenceAction) {
use crate::tree::persistence_state::CurrentPersistenceAction;
use tokio::sync::oneshot;
// Create a dummy receiver for testing - it will never receive a value
let (_tx, rx) = oneshot::channel();
match action {
CurrentPersistenceAction::SavingBlocks { highest } => {
self.harness.tree.persistence_state.start_save(highest, rx);
}
CurrentPersistenceAction::RemovingBlocks { new_tip_num } => {
self.harness.tree.persistence_state.start_remove(new_tip_num, rx);
}
}
}
/// Check if persistence is currently in progress
fn is_persistence_in_progress(&self) -> bool {
self.harness.tree.persistence_state.in_progress()
}
/// Call `validate_block_with_state` directly with block
fn validate_block_direct(
&mut self,
block: RecoveredBlock<Block>,
) -> ValidationOutcome<EthPrimitives> {
let ctx = TreeCtx::new(
&mut self.harness.tree.state,
&self.harness.tree.persistence_state,
&self.harness.tree.canonical_in_memory_state,
);
let result = self.validator.validate_block(block, ctx);
self.metrics.record_validation(result.is_ok());
result
}
/// Get validation metrics for testing
fn validation_call_count(&self) -> usize {
self.metrics.total_calls()
}
}
/// Factory for creating test blocks with controllable properties
struct TestBlockFactory {
builder: TestBlockBuilder,
}
impl TestBlockFactory {
fn new(chain_spec: ChainSpec) -> Self {
Self { builder: TestBlockBuilder::eth().with_chain_spec(chain_spec) }
}
/// Create block that triggers consensus violation by corrupting state root
fn create_invalid_consensus_block(&mut self, parent_hash: B256) -> RecoveredBlock<Block> {
let mut block = self.builder.generate_random_block(1, parent_hash).into_block();
// Corrupt state root to trigger consensus violation
block.header.state_root = B256::random();
block.seal_slow().try_recover().unwrap()
}
/// Create block that triggers execution failure
fn create_invalid_execution_block(&mut self, parent_hash: B256) -> RecoveredBlock<Block> {
let mut block = self.builder.generate_random_block(1, parent_hash).into_block();
// Create transaction that will fail execution
// This is simplified - in practice we'd create a transaction with insufficient gas, etc.
block.header.gas_used = block.header.gas_limit + 1; // Gas used exceeds limit
block.seal_slow().try_recover().unwrap()
}
/// Create valid block
fn create_valid_block(&mut self, parent_hash: B256) -> RecoveredBlock<Block> {
let block = self.builder.generate_random_block(1, parent_hash).into_block();
block.seal_slow().try_recover().unwrap()
}
}
#[test]
fn test_tree_persist_block_batch() {
let tree_config = TreeConfig::default();
@@ -467,7 +613,9 @@ fn test_disconnected_payload() {
let block = Block::decode(&mut data.as_ref()).unwrap();
let sealed = block.seal_slow();
let hash = sealed.hash();
let payload = ExecutionPayloadV1::from_block_unchecked(hash, &sealed.clone().into_block());
let sealed_clone = sealed.clone();
let block = sealed.into_block();
let payload = ExecutionPayloadV1::from_block_unchecked(hash, &block);
let mut test_harness = TestHarness::new(HOLESKY.clone());
@@ -482,7 +630,7 @@ fn test_disconnected_payload() {
// ensure block is buffered
let buffered = test_harness.tree.state.buffer.block(&hash).unwrap();
assert_eq!(buffered.clone_sealed_block(), sealed);
assert_eq!(buffered.clone_sealed_block(), sealed_clone);
}
#[test]
@@ -510,8 +658,9 @@ async fn test_holesky_payload() {
let data = Bytes::from_str(s).unwrap();
let block: Block = Block::decode(&mut data.as_ref()).unwrap();
let sealed = block.seal_slow();
let payload =
ExecutionPayloadV1::from_block_unchecked(sealed.hash(), &sealed.clone().into_block());
let hash = sealed.hash();
let block = sealed.into_block();
let payload = ExecutionPayloadV1::from_block_unchecked(hash, &block);
let mut test_harness =
TestHarness::new(HOLESKY.clone()).with_backfill_state(BackfillSyncState::Active);
@@ -965,7 +1114,9 @@ fn test_on_new_payload_canonical_insertion() {
let block1 = Block::decode(&mut data.as_ref()).unwrap();
let sealed1 = block1.seal_slow();
let hash1 = sealed1.hash();
let payload1 = ExecutionPayloadV1::from_block_unchecked(hash1, &sealed1.clone().into_block());
let sealed1_clone = sealed1.clone();
let block1 = sealed1.into_block();
let payload1 = ExecutionPayloadV1::from_block_unchecked(hash1, &block1);
let mut test_harness = TestHarness::new(HOLESKY.clone());
@@ -986,7 +1137,7 @@ fn test_on_new_payload_canonical_insertion() {
// Ensure block is buffered (like test_disconnected_payload)
let buffered = test_harness.tree.state.buffer.block(&hash1).unwrap();
assert_eq!(buffered.clone_sealed_block(), sealed1, "Block should be buffered");
assert_eq!(buffered.clone_sealed_block(), sealed1_clone, "Block should be buffered");
}
/// Test that ensures payloads are rejected when linking to a known-invalid ancestor
@@ -1063,8 +1214,9 @@ fn test_on_new_payload_backfill_buffering() {
let data = Bytes::from_str(s).unwrap();
let block = Block::decode(&mut data.as_ref()).unwrap();
let sealed = block.seal_slow();
let payload =
ExecutionPayloadV1::from_block_unchecked(sealed.hash(), &sealed.clone().into_block());
let hash = sealed.hash();
let block = sealed.clone().into_block();
let payload = ExecutionPayloadV1::from_block_unchecked(hash, &block);
// Initialize test harness with backfill sync active
let mut test_harness =
@@ -1146,6 +1298,152 @@ fn test_on_new_payload_malformed_payload() {
}
}
/// Test different `StateRootStrategy` paths: `StateRootTask` with empty/non-empty prefix sets,
/// `Parallel`, `Synchronous`
#[test]
fn test_state_root_strategy_paths() {
reth_tracing::init_test_tracing();
let mut test_harness = TestHarness::new(MAINNET.clone());
// Test multiple scenarios to ensure different StateRootStrategy paths are taken:
// 1. `StateRootTask` with empty prefix_sets → uses payload_processor.spawn()
// 2. `StateRootTask` with non-empty prefix_sets → switches to `Parallel`, uses
// spawn_cache_exclusive()
// 3. `Parallel` strategy → uses spawn_cache_exclusive()
// 4. `Synchronous` strategy → uses spawn_cache_exclusive()
let s1 = include_str!("../../test-data/holesky/1.rlp");
let data1 = Bytes::from_str(s1).unwrap();
let block1 = Block::decode(&mut data1.as_ref()).unwrap();
let sealed1 = block1.seal_slow();
let hash1 = sealed1.hash();
let block1 = sealed1.into_block();
let payload1 = ExecutionPayloadV1::from_block_unchecked(hash1, &block1);
// Scenario 1: Test one strategy path
let outcome1 = test_harness
.tree
.on_new_payload(ExecutionData {
payload: payload1.into(),
sidecar: ExecutionPayloadSidecar::none(),
})
.unwrap();
assert!(
outcome1.outcome.is_valid() || outcome1.outcome.is_syncing(),
"First strategy path should work"
);
let s2 = include_str!("../../test-data/holesky/2.rlp");
let data2 = Bytes::from_str(s2).unwrap();
let block2 = Block::decode(&mut data2.as_ref()).unwrap();
let sealed2 = block2.seal_slow();
let hash2 = sealed2.hash();
let block2 = sealed2.into_block();
let payload2 = ExecutionPayloadV1::from_block_unchecked(hash2, &block2);
// Scenario 2: Test different strategy path (disconnected)
let outcome2 = test_harness
.tree
.on_new_payload(ExecutionData {
payload: payload2.into(),
sidecar: ExecutionPayloadSidecar::none(),
})
.unwrap();
assert!(outcome2.outcome.is_syncing(), "Second strategy path should work");
// This test passes if multiple StateRootStrategy scenarios work correctly,
// confirming that passing arguments directly doesn't break:
// - `StateRootTask` strategy with empty/non-empty prefix_sets
// - Dynamic strategy switching (StateRootTask → Parallel)
// - Parallel and Synchronous strategy paths
// - All parameter passing through the args struct
}
// ================================================================================================
// VALIDATE_BLOCK_WITH_STATE TEST SUITE
// ================================================================================================
//
// This test suite exercises `validate_block_with_state` across different scenarios including:
// - Basic block validation with state root computation
// - Strategy selection based on conditions (`StateRootTask`, `Parallel`, `Synchronous`)
// - Trie update retention and discard logic
// - Error precedence handling (consensus vs execution errors)
// - Different validation scenarios (valid, invalid consensus, invalid execution blocks)
/// Test `Synchronous` strategy when persistence is active
#[test]
fn test_validate_block_synchronous_strategy_during_persistence() {
reth_tracing::init_test_tracing();
let mut test_harness = ValidatorTestHarness::new(MAINNET.clone());
// Set up persistence action to force `Synchronous` strategy
use crate::tree::persistence_state::CurrentPersistenceAction;
let persistence_action = CurrentPersistenceAction::SavingBlocks {
highest: alloy_eips::NumHash::new(1, B256::random()),
};
test_harness.start_persistence_operation(persistence_action);
// Verify persistence is active
assert!(test_harness.is_persistence_in_progress());
// Create valid block
let mut block_factory = TestBlockFactory::new(MAINNET.as_ref().clone());
let genesis_hash = MAINNET.genesis_hash();
let valid_block = block_factory.create_valid_block(genesis_hash);
// Call validate_block_with_state directly
// This should execute the Synchronous strategy logic during active persistence
let result = test_harness.validate_block_direct(valid_block);
// Verify validation was attempted (may fail due to test environment limitations)
// The key test is that the Synchronous strategy path is executed during persistence
assert!(result.is_ok() || result.is_err(), "Validation should complete")
}
/// Test multiple validation scenarios including valid, consensus-invalid, and execution-invalid
/// blocks with proper result validation
#[test]
fn test_validate_block_multiple_scenarios() {
reth_tracing::init_test_tracing();
// Test multiple scenarios to ensure comprehensive coverage
let mut test_harness = ValidatorTestHarness::new(MAINNET.clone());
let mut block_factory = TestBlockFactory::new(MAINNET.as_ref().clone());
let genesis_hash = MAINNET.genesis_hash();
// Scenario 1: Valid block validation (may fail due to test environment limitations)
let valid_block = block_factory.create_valid_block(genesis_hash);
let result1 = test_harness.validate_block_direct(valid_block);
// Note: Valid blocks might fail in test environment due to missing provider data,
// but the important thing is that the validation logic executes without panicking
assert!(
result1.is_ok() || result1.is_err(),
"Valid block validation should complete (may fail due to test environment)"
);
// Scenario 2: Block with consensus issues should be rejected
let consensus_invalid = block_factory.create_invalid_consensus_block(genesis_hash);
let result2 = test_harness.validate_block_direct(consensus_invalid);
assert!(result2.is_err(), "Consensus-invalid block (invalid state root) should be rejected");
// Scenario 3: Block with execution issues should be rejected
let execution_invalid = block_factory.create_invalid_execution_block(genesis_hash);
let result3 = test_harness.validate_block_direct(execution_invalid);
assert!(result3.is_err(), "Execution-invalid block (gas limit exceeded) should be rejected");
// Verify all validation scenarios executed without panics
let total_calls = test_harness.validation_call_count();
assert!(
total_calls >= 2,
"At least invalid block validations should have executed (got {})",
total_calls
);
}
/// Test suite for the `check_invalid_ancestors` method
#[cfg(test)]
mod check_invalid_ancestors_tests {

View File

@@ -107,6 +107,23 @@ pub trait Executor<DB: Database>: Sized {
Ok(BlockExecutionOutput { state: state.take_bundle(), result })
}
/// Executes the EVM with the given input and accepts a state closure that is always invoked
/// with the EVM state after execution, even after failure.
fn execute_with_state_closure_always<F>(
mut self,
block: &RecoveredBlock<<Self::Primitives as NodePrimitives>::Block>,
mut f: F,
) -> Result<BlockExecutionOutput<<Self::Primitives as NodePrimitives>::Receipt>, Self::Error>
where
F: FnMut(&State<DB>),
{
let result = self.execute_one(block);
let mut state = self.into_state();
f(&state);
Ok(BlockExecutionOutput { state: state.take_bundle(), result: result? })
}
/// Executes the EVM with the given input and accepts a state hook closure that is invoked with
/// the EVM state after execution.
fn execute_with_state_hook<F>(

View File

@@ -1,5 +1,6 @@
use crate::errors::PingerError;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::Duration,
@@ -77,7 +78,7 @@ impl Pinger {
}
}
PingState::WaitingForPong => {
if self.timeout_timer.is_elapsed() {
if self.timeout_timer.as_mut().poll(cx).is_ready() {
self.state = PingState::TimedOut;
return Poll::Ready(Ok(PingerEvent::Timeout))
}

View File

@@ -7,9 +7,6 @@ use reth_network::{protocol::IntoRlpxSubProtocol, NetworkPrimitives};
use reth_transaction_pool::PoolConfig;
use std::{borrow::Cow, time::Duration};
/// 45M gas limit
const ETHEREUM_BLOCK_GAS_LIMIT_45M: u64 = 45_000_000;
/// 60M gas limit
const ETHEREUM_BLOCK_GAS_LIMIT_60M: u64 = 60_000_000;
@@ -48,7 +45,7 @@ pub trait PayloadBuilderConfig {
ChainKind::Named(NamedChain::Sepolia | NamedChain::Holesky | NamedChain::Hoodi) => {
ETHEREUM_BLOCK_GAS_LIMIT_60M
}
ChainKind::Named(NamedChain::Mainnet) => ETHEREUM_BLOCK_GAS_LIMIT_45M,
ChainKind::Named(NamedChain::Mainnet) => ETHEREUM_BLOCK_GAS_LIMIT_60M,
_ => ETHEREUM_BLOCK_GAS_LIMIT_36M,
}
}

View File

@@ -37,14 +37,14 @@ struct NodeState {
current_stage: Option<CurrentStage>,
/// The latest block reached by either pipeline or consensus engine.
latest_block: Option<BlockNumber>,
/// The time of the latest block seen by the pipeline
latest_block_time: Option<u64>,
/// Hash of the head block last set by fork choice update
head_block_hash: Option<B256>,
/// Hash of the safe block last set by fork choice update
safe_block_hash: Option<B256>,
/// Hash of finalized block last set by fork choice update
finalized_block_hash: Option<B256>,
/// The time when we last logged a status message
last_status_log_time: Option<u64>,
}
impl NodeState {
@@ -56,10 +56,10 @@ impl NodeState {
peers_info,
current_stage: None,
latest_block,
latest_block_time: None,
head_block_hash: None,
safe_block_hash: None,
finalized_block_hash: None,
last_status_log_time: None,
}
}
@@ -271,8 +271,6 @@ impl NodeState {
}
ConsensusEngineEvent::CanonicalChainCommitted(head, elapsed) => {
self.latest_block = Some(head.number());
self.latest_block_time = Some(head.timestamp());
info!(number=head.number(), hash=?head.hash(), ?elapsed, "Canonical chain committed");
}
ConsensusEngineEvent::ForkBlockAdded(executed, elapsed) => {
@@ -483,25 +481,28 @@ where
)
}
}
} else if let Some(latest_block) = this.state.latest_block {
} else {
let now =
SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
if now.saturating_sub(this.state.latest_block_time.unwrap_or(0)) > 60 {
// Once we start receiving consensus nodes, don't emit status unless stalled for
// 1 minute
info!(
target: "reth::cli",
connected_peers = this.state.num_connected_peers(),
%latest_block,
"Status"
);
// Only log status if we haven't logged recently
if now.saturating_sub(this.state.last_status_log_time.unwrap_or(0)) > 60 {
if let Some(latest_block) = this.state.latest_block {
info!(
target: "reth::cli",
connected_peers = this.state.num_connected_peers(),
%latest_block,
"Status"
);
} else {
info!(
target: "reth::cli",
connected_peers = this.state.num_connected_peers(),
"Status"
);
}
this.state.last_status_log_time = Some(now);
}
} else {
info!(
target: "reth::cli",
connected_peers = this.state.num_connected_peers(),
"Status"
);
}
}

View File

@@ -26,6 +26,7 @@ reth-storage-api.workspace = true
reth-node-api.workspace = true
reth-tasks.workspace = true
reth-metrics.workspace = true
reth-trie.workspace = true
# alloy
alloy-eips = { workspace = true, features = ["serde"] }
@@ -51,6 +52,7 @@ metrics.workspace = true
eyre.workspace = true
ringbuffer.workspace = true
derive_more.workspace = true
[dev-dependencies]
test-case.workspace = true

View File

@@ -49,6 +49,10 @@ impl FlashBlockConsensusClient {
let block_hash = sequence.payload_base().parent_hash;
previous_block_hashes.push(block_hash);
if sequence.state_root().is_none() {
warn!("Missing state root for the complete sequence")
}
// Load previous block hashes. We're using (head - 32) and (head - 64) as the
// safe and finalized block hashes.
let safe_block_hash = self.get_previous_block_hash(&previous_block_hashes, 32);

View File

@@ -1,25 +1,27 @@
//! A downstream integration of Flashblocks.
pub use payload::{
ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, FlashBlock, Metadata,
ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, FlashBlock, FlashBlockDecoder,
Metadata,
};
use reth_rpc_eth_types::PendingBlock;
pub use service::FlashBlockService;
pub use ws::{WsConnect, WsFlashBlockStream};
mod consensus;
pub use consensus::FlashBlockConsensusClient;
mod payload;
pub use payload::PendingFlashBlock;
mod sequence;
pub use sequence::FlashBlockCompleteSequence;
mod service;
mod worker;
mod ws;
/// Receiver of the most recent [`PendingBlock`] built out of [`FlashBlock`]s.
/// Receiver of the most recent [`PendingFlashBlock`] built out of [`FlashBlock`]s.
///
/// [`FlashBlock`]: crate::FlashBlock
pub type PendingBlockRx<N> = tokio::sync::watch::Receiver<Option<PendingBlock<N>>>;
pub type PendingBlockRx<N> = tokio::sync::watch::Receiver<Option<PendingFlashBlock<N>>>;
/// Receiver of the sequences of [`FlashBlock`]s built.
///

View File

@@ -1,8 +1,12 @@
use alloy_consensus::BlockHeader;
use alloy_eips::eip4895::Withdrawal;
use alloy_primitives::{Address, Bloom, Bytes, B256, U256};
use alloy_primitives::{bytes, Address, Bloom, Bytes, B256, U256};
use alloy_rpc_types_engine::PayloadId;
use derive_more::Deref;
use reth_node_api::NodePrimitives;
use reth_optimism_evm::OpNextBlockEnvAttributes;
use reth_optimism_primitives::OpReceipt;
use reth_rpc_eth_types::PendingBlock;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
@@ -39,6 +43,19 @@ impl FlashBlock {
}
}
/// A trait for decoding flashblocks from bytes.
pub trait FlashBlockDecoder: Send + 'static {
/// Decodes `bytes` into a [`FlashBlock`].
fn decode(&self, bytes: bytes::Bytes) -> eyre::Result<FlashBlock>;
}
/// Default implementation of the decoder.
impl FlashBlockDecoder for () {
fn decode(&self, bytes: bytes::Bytes) -> eyre::Result<FlashBlock> {
FlashBlock::decode(bytes)
}
}
/// Provides metadata about the block that may be useful for indexing or analysis.
#[derive(Default, Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct Metadata {
@@ -119,3 +136,35 @@ impl From<ExecutionPayloadBaseV1> for OpNextBlockEnvAttributes {
}
}
}
/// The pending block built with all received Flashblocks alongside the metadata for the last added
/// Flashblock.
#[derive(Debug, Clone, Deref)]
pub struct PendingFlashBlock<N: NodePrimitives> {
/// The complete pending block built out of all received Flashblocks.
#[deref]
pub pending: PendingBlock<N>,
/// A sequential index that identifies the last Flashblock added to this block.
pub last_flashblock_index: u64,
/// The last Flashblock block hash,
pub last_flashblock_hash: B256,
/// Whether the [`PendingBlock`] has a properly computed stateroot.
pub has_computed_state_root: bool,
}
impl<N: NodePrimitives> PendingFlashBlock<N> {
/// Create new pending flashblock.
pub const fn new(
pending: PendingBlock<N>,
last_flashblock_index: u64,
last_flashblock_hash: B256,
has_computed_state_root: bool,
) -> Self {
Self { pending, last_flashblock_index, last_flashblock_hash, has_computed_state_root }
}
/// Returns the properly calculated state root for that block if it was computed.
pub fn computed_state_root(&self) -> Option<B256> {
self.has_computed_state_root.then_some(self.pending.block().state_root())
}
}

View File

@@ -1,5 +1,6 @@
use crate::{ExecutionPayloadBaseV1, FlashBlock, FlashBlockCompleteSequenceRx};
use alloy_eips::eip2718::WithEncoded;
use alloy_primitives::B256;
use core::mem;
use eyre::{bail, OptionExt};
use reth_primitives_traits::{Recovered, SignedTransaction};
@@ -20,6 +21,8 @@ pub(crate) struct FlashBlockPendingSequence<T> {
inner: BTreeMap<u64, PreparedFlashBlock<T>>,
/// Broadcasts flashblocks to subscribers.
block_broadcaster: broadcast::Sender<FlashBlockCompleteSequence>,
/// Optional properly computed state root for the current sequence.
state_root: Option<B256>,
}
impl<T> FlashBlockPendingSequence<T>
@@ -30,7 +33,7 @@ where
// Note: if the channel is full, send will not block but rather overwrite the oldest
// messages. Order is preserved.
let (tx, _) = broadcast::channel(FLASHBLOCK_SEQUENCE_CHANNEL_SIZE);
Self { inner: BTreeMap::new(), block_broadcaster: tx }
Self { inner: BTreeMap::new(), block_broadcaster: tx, state_root: None }
}
/// Gets a subscriber to the flashblock sequences produced.
@@ -46,6 +49,7 @@ where
if self.block_broadcaster.receiver_count() > 0 {
let flashblocks = match FlashBlockCompleteSequence::new(
flashblocks.into_iter().map(|block| block.1.into()).collect(),
self.state_root,
) {
Ok(flashblocks) => flashblocks,
Err(err) => {
@@ -88,6 +92,11 @@ where
Ok(())
}
/// Set state root
pub(crate) const fn set_state_root(&mut self, state_root: Option<B256>) {
self.state_root = state_root;
}
/// Iterator over sequence of executable transactions.
///
/// A flashblocks is not ready if there's missing previous flashblocks, i.e. there's a gap in
@@ -121,12 +130,26 @@ where
pub(crate) fn count(&self) -> usize {
self.inner.len()
}
/// Returns the reference to the last flashblock.
pub(crate) fn last_flashblock(&self) -> Option<&FlashBlock> {
self.inner.last_key_value().map(|(_, b)| &b.block)
}
/// Returns the current/latest flashblock index in the sequence
pub(crate) fn index(&self) -> Option<u64> {
Some(self.inner.values().last()?.block().index)
}
}
/// A complete sequence of flashblocks, often corresponding to a full block.
/// Ensure invariants of a complete flashblocks sequence.
#[derive(Debug, Clone)]
pub struct FlashBlockCompleteSequence(Vec<FlashBlock>);
pub struct FlashBlockCompleteSequence {
inner: Vec<FlashBlock>,
/// Optional state root for the current sequence
state_root: Option<B256>,
}
impl FlashBlockCompleteSequence {
/// Create a complete sequence from a vector of flashblocks.
@@ -134,7 +157,7 @@ impl FlashBlockCompleteSequence {
/// * vector is not empty
/// * first flashblock have the base payload
/// * sequence of flashblocks is sound (successive index from 0, same payload id, ...)
pub fn new(blocks: Vec<FlashBlock>) -> eyre::Result<Self> {
pub fn new(blocks: Vec<FlashBlock>, state_root: Option<B256>) -> eyre::Result<Self> {
let first_block = blocks.first().ok_or_eyre("No flashblocks in sequence")?;
// Ensure that first flashblock have base
@@ -149,27 +172,32 @@ impl FlashBlockCompleteSequence {
bail!("Flashblock inconsistencies detected in sequence");
}
Ok(Self(blocks))
Ok(Self { inner: blocks, state_root })
}
/// Returns the block number
pub fn block_number(&self) -> u64 {
self.0.first().unwrap().metadata.block_number
self.inner.first().unwrap().metadata.block_number
}
/// Returns the payload base of the first flashblock.
pub fn payload_base(&self) -> &ExecutionPayloadBaseV1 {
self.0.first().unwrap().base.as_ref().unwrap()
self.inner.first().unwrap().base.as_ref().unwrap()
}
/// Returns the number of flashblocks in the sequence.
pub const fn count(&self) -> usize {
self.0.len()
self.inner.len()
}
/// Returns the last flashblock in the sequence.
pub fn last(&self) -> &FlashBlock {
self.0.last().unwrap()
self.inner.last().unwrap()
}
/// Returns the state root for the current sequence
pub const fn state_root(&self) -> Option<B256> {
self.state_root
}
}
@@ -177,7 +205,7 @@ impl Deref for FlashBlockCompleteSequence {
type Target = Vec<FlashBlock>;
fn deref(&self) -> &Self::Target {
&self.0
&self.inner
}
}
@@ -186,6 +214,7 @@ impl<T> TryFrom<FlashBlockPendingSequence<T>> for FlashBlockCompleteSequence {
fn try_from(sequence: FlashBlockPendingSequence<T>) -> Result<Self, Self::Error> {
Self::new(
sequence.inner.into_values().map(|block| block.block().clone()).collect::<Vec<_>>(),
sequence.state_root,
)
}
}
@@ -230,6 +259,14 @@ where
}
}
impl<T> Deref for PreparedFlashBlock<T> {
type Target = FlashBlock;
fn deref(&self) -> &Self::Target {
&self.block
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -333,7 +370,7 @@ mod tests {
let flashblocks = subscriber.try_recv().unwrap();
assert_eq!(flashblocks.count(), 10);
for (idx, block) in flashblocks.0.iter().enumerate() {
for (idx, block) in flashblocks.iter().enumerate() {
assert_eq!(block.index, idx as u64);
}
}

View File

@@ -1,7 +1,7 @@
use crate::{
sequence::FlashBlockPendingSequence,
worker::{BuildArgs, FlashBlockBuilder},
ExecutionPayloadBaseV1, FlashBlock, FlashBlockCompleteSequenceRx,
ExecutionPayloadBaseV1, FlashBlock, FlashBlockCompleteSequenceRx, PendingFlashBlock,
};
use alloy_eips::eip2718::WithEncoded;
use alloy_primitives::B256;
@@ -14,7 +14,6 @@ use reth_primitives_traits::{
AlloyBlockHeader, BlockTy, HeaderTy, NodePrimitives, ReceiptTy, Recovered,
};
use reth_revm::cached::CachedReads;
use reth_rpc_eth_types::PendingBlock;
use reth_storage_api::{BlockReaderIdExt, StateProviderFactory};
use reth_tasks::TaskExecutor;
use std::{
@@ -25,7 +24,9 @@ use std::{
use tokio::{pin, sync::oneshot};
use tracing::{debug, trace, warn};
/// The `FlashBlockService` maintains an in-memory [`PendingBlock`] built out of a sequence of
pub(crate) const FB_STATE_ROOT_FROM_INDEX: usize = 9;
/// The `FlashBlockService` maintains an in-memory [`PendingFlashBlock`] built out of a sequence of
/// [`FlashBlock`]s.
#[derive(Debug)]
pub struct FlashBlockService<
@@ -35,7 +36,7 @@ pub struct FlashBlockService<
Provider,
> {
rx: S,
current: Option<PendingBlock<N>>,
current: Option<PendingFlashBlock<N>>,
blocks: FlashBlockPendingSequence<N::SignedTx>,
rebuild: bool,
builder: FlashBlockBuilder<EvmConfig, Provider>,
@@ -43,11 +44,13 @@ pub struct FlashBlockService<
spawner: TaskExecutor,
job: Option<BuildJob<N>>,
/// Cached state reads for the current block.
/// Current `PendingBlock` is built out of a sequence of `FlashBlocks`, and executed again when
/// fb received on top of the same block. Avoid redundant I/O across multiple executions
/// within the same block.
/// Current `PendingFlashBlock` is built out of a sequence of `FlashBlocks`, and executed again
/// when fb received on top of the same block. Avoid redundant I/O across multiple
/// executions within the same block.
cached_state: Option<(B256, CachedReads)>,
metrics: FlashBlockServiceMetrics,
/// Enable state root calculation from flashblock with index [`FB_STATE_ROOT_FROM_INDEX`]
compute_state_root: bool,
}
impl<N, S, EvmConfig, Provider> FlashBlockService<N, S, EvmConfig, Provider>
@@ -81,9 +84,16 @@ where
job: None,
cached_state: None,
metrics: FlashBlockServiceMetrics::default(),
compute_state_root: false,
}
}
/// Enable state root calculation from flashblock
pub const fn compute_state_root(mut self, enable_state_root: bool) -> Self {
self.compute_state_root = enable_state_root;
self
}
/// Returns a subscriber to the flashblock sequence.
pub fn subscribe_block_sequence(&self) -> FlashBlockCompleteSequenceRx {
self.blocks.subscribe_block_sequence()
@@ -92,7 +102,7 @@ where
/// Drives the services and sends new blocks to the receiver
///
/// Note: this should be spawned
pub async fn run(mut self, tx: tokio::sync::watch::Sender<Option<PendingBlock<N>>>) {
pub async fn run(mut self, tx: tokio::sync::watch::Sender<Option<PendingFlashBlock<N>>>) {
while let Some(block) = self.next().await {
if let Ok(block) = block.inspect_err(|e| tracing::error!("{e}")) {
let _ = tx.send(block).inspect_err(|e| tracing::error!("{e}"));
@@ -128,18 +138,30 @@ where
latest.hash() != base.parent_hash
{
trace!(flashblock_parent=?base.parent_hash, flashblock_number=base.block_number, local_latest=?latest.num_hash(), "Skipping non consecutive build attempt");
return None;
return None
}
let Some(last_flashblock) = self.blocks.last_flashblock() else {
trace!(flashblock_number = ?self.blocks.block_number(), count = %self.blocks.count(), "Missing last flashblock");
return None
};
// Check if state root must be computed
let compute_state_root =
self.compute_state_root && self.blocks.index() >= Some(FB_STATE_ROOT_FROM_INDEX as u64);
Some(BuildArgs {
base,
transactions: self.blocks.ready_transactions().collect::<Vec<_>>(),
cached_state: self.cached_state.take(),
last_flashblock_index: last_flashblock.index,
last_flashblock_hash: last_flashblock.diff.block_hash,
compute_state_root,
})
}
/// Takes out `current` [`PendingBlock`] if `state` is not preceding it.
fn on_new_tip(&mut self, state: CanonStateNotification<N>) -> Option<PendingBlock<N>> {
/// Takes out `current` [`PendingFlashBlock`] if `state` is not preceding it.
fn on_new_tip(&mut self, state: CanonStateNotification<N>) -> Option<PendingFlashBlock<N>> {
let tip = state.tip_checked()?;
let tip_hash = tip.hash();
let current = self.current.take_if(|current| current.parent_hash() != tip_hash);
@@ -180,7 +202,7 @@ where
+ Clone
+ 'static,
{
type Item = eyre::Result<Option<PendingBlock<N>>>;
type Item = eyre::Result<Option<PendingFlashBlock<N>>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
@@ -200,6 +222,9 @@ where
if let Some((now, result)) = result {
match result {
Ok(Some((new_pending, cached_reads))) => {
// update state root of the current sequence
this.blocks.set_state_root(new_pending.computed_state_root());
// built a new pending block
this.current = Some(new_pending.clone());
// cache reads
@@ -286,7 +311,7 @@ where
}
type BuildJob<N> =
(Instant, oneshot::Receiver<eyre::Result<Option<(PendingBlock<N>, CachedReads)>>>);
(Instant, oneshot::Receiver<eyre::Result<Option<(PendingFlashBlock<N>, CachedReads)>>>);
#[derive(Metrics)]
#[metrics(scope = "flashblock_service")]

View File

@@ -1,4 +1,4 @@
use crate::ExecutionPayloadBaseV1;
use crate::{ExecutionPayloadBaseV1, PendingFlashBlock};
use alloy_eips::{eip2718::WithEncoded, BlockNumberOrTag};
use alloy_primitives::B256;
use reth_chain_state::{CanonStateSubscriptions, ExecutedBlock};
@@ -38,9 +38,12 @@ impl<EvmConfig, Provider> FlashBlockBuilder<EvmConfig, Provider> {
}
pub(crate) struct BuildArgs<I> {
pub base: ExecutionPayloadBaseV1,
pub transactions: I,
pub cached_state: Option<(B256, CachedReads)>,
pub(crate) base: ExecutionPayloadBaseV1,
pub(crate) transactions: I,
pub(crate) cached_state: Option<(B256, CachedReads)>,
pub(crate) last_flashblock_index: u64,
pub(crate) last_flashblock_hash: B256,
pub(crate) compute_state_root: bool,
}
impl<N, EvmConfig, Provider> FlashBlockBuilder<EvmConfig, Provider>
@@ -56,14 +59,14 @@ where
Receipt = ReceiptTy<N>,
> + Unpin,
{
/// Returns the [`PendingBlock`] made purely out of transactions and [`ExecutionPayloadBaseV1`]
/// in `args`.
/// Returns the [`PendingFlashBlock`] made purely out of transactions and
/// [`ExecutionPayloadBaseV1`] in `args`.
///
/// Returns `None` if the flashblock doesn't attach to the latest header.
pub(crate) fn execute<I: IntoIterator<Item = WithEncoded<Recovered<N::SignedTx>>>>(
&self,
mut args: BuildArgs<I>,
) -> eyre::Result<Option<(PendingBlock<N>, CachedReads)>> {
) -> eyre::Result<Option<(PendingFlashBlock<N>, CachedReads)>> {
trace!("Attempting new pending block from flashblocks");
let latest = self
@@ -100,8 +103,13 @@ where
let _gas_used = builder.execute_transaction(tx)?;
}
// if the real state root should be computed
let BlockBuilderOutcome { execution_result, block, hashed_state, .. } =
builder.finish(NoopProvider::default())?;
if args.compute_state_root {
builder.finish(&state_provider)?
} else {
builder.finish(NoopProvider::default())?
};
let execution_outcome = ExecutionOutcome::new(
state.take_bundle(),
@@ -110,17 +118,22 @@ where
vec![execution_result.requests],
);
Ok(Some((
PendingBlock::with_executed_block(
Instant::now() + Duration::from_secs(1),
ExecutedBlock {
recovered_block: block.into(),
execution_output: Arc::new(execution_outcome),
hashed_state: Arc::new(hashed_state),
},
),
request_cache,
)))
let pending_block = PendingBlock::with_executed_block(
Instant::now() + Duration::from_secs(1),
ExecutedBlock {
recovered_block: block.into(),
execution_output: Arc::new(execution_outcome),
hashed_state: Arc::new(hashed_state),
},
);
let pending_flashblock = PendingFlashBlock::new(
pending_block,
args.last_flashblock_index,
args.last_flashblock_hash,
args.compute_state_root,
);
Ok(Some((pending_flashblock, request_cache)))
}
}

View File

@@ -1,4 +1,4 @@
use crate::FlashBlock;
use crate::{FlashBlock, FlashBlockDecoder};
use futures_util::{
stream::{SplitSink, SplitStream},
FutureExt, Sink, Stream, StreamExt,
@@ -28,6 +28,7 @@ pub struct WsFlashBlockStream<Stream, Sink, Connector> {
ws_url: Url,
state: State,
connector: Connector,
decoder: Box<dyn FlashBlockDecoder>,
connect: ConnectFuture<Sink, Stream>,
stream: Option<Stream>,
sink: Option<Sink>,
@@ -40,11 +41,17 @@ impl WsFlashBlockStream<WsStream, WsSink, WsConnector> {
ws_url,
state: State::default(),
connector: WsConnector,
decoder: Box::new(()),
connect: Box::pin(async move { Err(Error::ConnectionClosed)? }),
stream: None,
sink: None,
}
}
/// Sets the [`FlashBlock`] decoder for the websocket stream.
pub fn with_decoder(self, decoder: Box<dyn FlashBlockDecoder>) -> Self {
Self { decoder, ..self }
}
}
impl<Stream, S, C> WsFlashBlockStream<Stream, S, C> {
@@ -53,6 +60,7 @@ impl<Stream, S, C> WsFlashBlockStream<Stream, S, C> {
Self {
ws_url,
state: State::default(),
decoder: Box::new(()),
connector,
connect: Box::pin(async move { Err(Error::ConnectionClosed)? }),
stream: None,
@@ -111,10 +119,10 @@ where
match msg {
Ok(Message::Binary(bytes)) => {
return Poll::Ready(Some(FlashBlock::decode(bytes)))
return Poll::Ready(Some(this.decoder.decode(bytes)))
}
Ok(Message::Text(bytes)) => {
return Poll::Ready(Some(FlashBlock::decode(bytes.into())))
return Poll::Ready(Some(this.decoder.decode(bytes.into())))
}
Ok(Message::Ping(bytes)) => this.ping(bytes),
Ok(Message::Close(frame)) => this.close(frame),

View File

@@ -139,7 +139,7 @@ impl<N: RpcNodeCore, Rpc: RpcConvert> OpEthApi<N, Rpc> {
parent.hash() == pending_block.block().parent_hash() &&
now <= pending_block.expires_at
{
return Ok(Some(pending_block.clone()));
return Ok(Some(pending_block.pending.clone()));
}
Ok(None)

View File

@@ -175,7 +175,10 @@ where
/// the response if it was forwarded.
async fn maybe_forward_request(&self, req: &Request<'_>) -> Option<MethodResponse> {
let should_forward = match req.method_name() {
"debug_traceTransaction" => self.should_forward_transaction(req),
"debug_traceTransaction" |
"eth_getTransactionByHash" |
"eth_getTransactionReceipt" |
"eth_getRawTransactionByHash" => self.should_forward_transaction(req),
method => self.should_forward_block_request(method, req),
};

View File

@@ -462,9 +462,7 @@ impl<B: Block> Eq for RecoveredBlock<B> {}
impl<B: Block> PartialEq for RecoveredBlock<B> {
fn eq(&self, other: &Self) -> bool {
self.hash_ref().eq(other.hash_ref()) &&
self.block.eq(&other.block) &&
self.senders.eq(&other.senders)
self.block.eq(&other.block) && self.senders.eq(&other.senders)
}
}

View File

@@ -1,6 +1,6 @@
//! Loads chain configuration.
use alloy_consensus::{BlockHeader, Header};
use alloy_consensus::Header;
use alloy_eips::eip7910::{EthConfig, EthForkConfig, SystemContract};
use alloy_evm::precompiles::Precompile;
use alloy_primitives::Address;
@@ -14,6 +14,7 @@ use reth_rpc_eth_types::EthApiError;
use reth_storage_api::BlockReaderIdExt;
use std::collections::BTreeMap;
/// RPC endpoint support for [EIP-7910](https://eips.ethereum.org/EIPS/eip-7910)
#[cfg_attr(not(feature = "client"), rpc(server, namespace = "eth"))]
#[cfg_attr(feature = "client", rpc(server, client, namespace = "eth"))]
pub trait EthConfigApi {
@@ -88,11 +89,6 @@ where
.ok_or_else(|| ProviderError::BestBlockNotFound)?
.into_header();
// Short-circuit if Cancun is not active.
if !chain_spec.is_cancun_active_at_timestamp(latest.timestamp()) {
return Err(RethError::msg("cancun has not been activated"))
}
let current_precompiles = evm_to_precompiles_map(
self.evm_config.evm_for_block(EmptyDB::default(), &latest).map_err(RethError::other)?,
);

View File

@@ -2833,6 +2833,11 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWrite
}
fn remove_blocks_above(&self, block: BlockNumber) -> ProviderResult<()> {
// Clean up HeaderNumbers for blocks being removed, we must clear all indexes from MDBX.
for hash in self.canonical_hashes_range(block + 1, self.last_block_number()? + 1)? {
self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
}
// Get highest static file block for the total block range
let highest_static_file_block = self
.static_file_provider()

View File

@@ -247,7 +247,7 @@ impl<T: ParkedOrd> ParkedPool<T> {
assert_eq!(
self.last_sender_submission.len(),
self.sender_transaction_count.len(),
"last_sender_transaction.len() != sender_to_last_transaction.len()"
"last_sender_submission.len() != sender_transaction_count.len()"
);
}
}

View File

@@ -571,16 +571,16 @@ impl<T: TransactionOrdering> PendingPool<T> {
pub(crate) fn assert_invariants(&self) {
assert!(
self.independent_transactions.len() <= self.by_id.len(),
"independent.len() > all.len()"
"independent_transactions.len() > by_id.len()"
);
assert!(
self.highest_nonces.len() <= self.by_id.len(),
"independent_descendants.len() > all.len()"
"highest_nonces.len() > by_id.len()"
);
assert_eq!(
self.highest_nonces.len(),
self.independent_transactions.len(),
"independent.len() = independent_descendants.len()"
"highest_nonces.len() != independent_transactions.len()"
);
}
}

View File

@@ -2110,7 +2110,7 @@ impl<T: PoolTransaction> AllTransactions<T> {
#[cfg(any(test, feature = "test-utils"))]
pub(crate) fn assert_invariants(&self) {
assert_eq!(self.by_hash.len(), self.txs.len(), "by_hash.len() != txs.len()");
assert!(self.auths.len() <= self.txs.len(), "auths > txs.len()");
assert!(self.auths.len() <= self.txs.len(), "auths.len() > txs.len()");
}
}

View File

@@ -1476,8 +1476,8 @@ impl MockFeeRange {
max_fee_blob: Range<u128>,
) -> Self {
assert!(
max_fee.start <= priority_fee.end,
"max_fee_range should be strictly below the priority fee range"
max_fee.start >= priority_fee.end,
"max_fee_range should be strictly above the priority fee range"
);
Self {
gas_price: gas_price.try_into().unwrap(),

View File

@@ -106,8 +106,8 @@ impl<Factory> ParallelProof<Factory>
where
Factory: DatabaseProviderFactory<Provider: BlockReader> + Clone + 'static,
{
/// Spawns a storage proof on the storage proof task and returns a receiver for the result.
fn spawn_storage_proof(
/// Queues a storage proof task and returns a receiver for the result.
fn queue_storage_proof(
&self,
hashed_address: B256,
prefix_set: PrefixSet,
@@ -144,7 +144,7 @@ where
"Starting storage proof generation"
);
let receiver = self.spawn_storage_proof(hashed_address, prefix_set, target_slots);
let receiver = self.queue_storage_proof(hashed_address, prefix_set, target_slots);
let proof_result = receiver.recv().map_err(|_| {
ParallelStateRootError::StorageRoot(StorageRootError::Database(DatabaseError::Other(
format!("channel closed for {hashed_address}"),
@@ -161,16 +161,6 @@ where
proof_result
}
/// Generate a [`DecodedStorageMultiProof`] for the given proof by first calling
/// `storage_proof`, then decoding the proof nodes.
pub fn decoded_storage_proof(
self,
hashed_address: B256,
target_slots: B256Set,
) -> Result<DecodedStorageMultiProof, ParallelStateRootError> {
self.storage_proof(hashed_address, target_slots)
}
/// Generate a state multiproof according to specified targets.
pub fn decoded_multiproof(
self,
@@ -217,7 +207,7 @@ where
storage_root_targets.into_iter().sorted_unstable_by_key(|(address, _)| *address)
{
let target_slots = targets.get(&hashed_address).cloned().unwrap_or_default();
let receiver = self.spawn_storage_proof(hashed_address, prefix_set, target_slots);
let receiver = self.queue_storage_proof(hashed_address, prefix_set, target_slots);
// store the receiver for that result with the hashed address so we can await this in
// place when we iterate over the trie

View File

@@ -21,7 +21,7 @@ export default defineConfig({
},
{ text: 'GitHub', link: 'https://github.com/paradigmxyz/reth' },
{
text: 'v1.8.1',
text: 'v1.8.2',
items: [
{
text: 'Releases',

View File

@@ -54,8 +54,10 @@ impl Suite for BlockchainTests {
/// An Ethereum blockchain test.
#[derive(Debug, PartialEq, Eq)]
pub struct BlockchainTestCase {
tests: BTreeMap<String, BlockchainTest>,
skip: bool,
/// The tests within this test case.
pub tests: BTreeMap<String, BlockchainTest>,
/// Whether to skip this test case.
pub skip: bool,
}
impl BlockchainTestCase {
@@ -96,39 +98,45 @@ impl BlockchainTestCase {
}
/// Execute a single `BlockchainTest`, validating the outcome against the
/// expectations encoded in the JSON file.
fn run_single_case(name: &str, case: &BlockchainTest) -> Result<(), Error> {
/// expectations encoded in the JSON file. Returns the list of executed blocks
/// with their execution witnesses.
pub fn run_single_case(
name: &str,
case: &BlockchainTest,
) -> Result<Vec<(RecoveredBlock<Block>, ExecutionWitness)>, Error> {
let expectation = Self::expected_failure(case);
match run_case(case) {
// All blocks executed successfully.
Ok(()) => {
Ok(program_inputs) => {
// Check if the test case specifies that it should have failed
if let Some((block, msg)) = expectation {
Err(Error::Assertion(format!(
"Test case: {name}\nExpected failure at block {block} - {msg}, but all blocks succeeded",
)))
} else {
Ok(())
Ok(program_inputs)
}
}
// A block processing failure occurred.
err @ Err(Error::BlockProcessingFailed { block_number, .. }) => match expectation {
// It happened on exactly the block we were told to fail on
Some((expected, _)) if block_number == expected => Ok(()),
Err(Error::BlockProcessingFailed { block_number, partial_program_inputs, err }) => {
match expectation {
// It happened on exactly the block we were told to fail on
Some((expected, _)) if block_number == expected => Ok(partial_program_inputs),
// Uncle sidechain edge case, we accept as long as it failed.
// But we don't check the exact block number.
_ if Self::is_uncle_sidechain_case(name) => Ok(()),
// Uncle sidechain edge case, we accept as long as it failed.
// But we don't check the exact block number.
_ if Self::is_uncle_sidechain_case(name) => Ok(partial_program_inputs),
// Expected failure, but block number does not match
Some((expected, _)) => Err(Error::Assertion(format!(
"Test case: {name}\nExpected failure at block {expected}\nGot failure at block {block_number}",
))),
// Expected failure, but block number does not match
Some((expected, _)) => Err(Error::Assertion(format!(
"Test case: {name}\nExpected failure at block {expected}\nGot failure at block {block_number}",
))),
// No failure expected at all - bubble up original error.
None => err,
},
// No failure expected at all - bubble up original error.
None => Err(Error::BlockProcessingFailed { block_number, partial_program_inputs, err }),
}
}
// Nonprocessing error forward asis.
//
@@ -170,14 +178,14 @@ impl Case for BlockchainTestCase {
.iter()
.filter(|(_, case)| !Self::excluded_fork(case.network))
.par_bridge()
.try_for_each(|(name, case)| Self::run_single_case(name, case))?;
.try_for_each(|(name, case)| Self::run_single_case(name, case).map(|_| ()))?;
Ok(())
}
}
/// Executes a single `BlockchainTest`, returning an error if the blockchain state
/// does not match the expected outcome after all blocks are executed.
/// Executes a single `BlockchainTest` returning an error as soon as any block has a consensus
/// validation failure.
///
/// A `BlockchainTest` represents a self-contained scenario:
/// - It initializes a fresh blockchain state.
@@ -186,9 +194,13 @@ impl Case for BlockchainTestCase {
/// outcome.
///
/// Returns:
/// - `Ok(())` if all blocks execute successfully and the final state is correct.
/// - `Err(Error)` if any block fails to execute correctly, or if the post-state validation fails.
fn run_case(case: &BlockchainTest) -> Result<(), Error> {
/// - `Ok(_)` if all blocks execute successfully, returning recovered blocks and full block
/// execution witness.
/// - `Err(Error)` if any block fails to execute correctly, returning a partial block execution
/// witness if the error is of variant `BlockProcessingFailed`.
fn run_case(
case: &BlockchainTest,
) -> Result<Vec<(RecoveredBlock<Block>, ExecutionWitness)>, Error> {
// Create a new test database and initialize a provider for the test case.
let chain_spec: Arc<ChainSpec> = Arc::new(case.network.into());
let factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
@@ -202,22 +214,24 @@ fn run_case(case: &BlockchainTest) -> Result<(), Error> {
.try_recover()
.unwrap();
provider.insert_block(genesis_block.clone()).map_err(|err| Error::block_failed(0, err))?;
provider
.insert_block(genesis_block.clone())
.map_err(|err| Error::block_failed(0, Default::default(), err))?;
// Increment block number for receipts static file
provider
.static_file_provider()
.latest_writer(StaticFileSegment::Receipts)
.and_then(|mut writer| writer.increment_block(0))
.map_err(|err| Error::block_failed(0, err))?;
.map_err(|err| Error::block_failed(0, Default::default(), err))?;
let genesis_state = case.pre.clone().into_genesis_state();
insert_genesis_state(&provider, genesis_state.iter())
.map_err(|err| Error::block_failed(0, err))?;
.map_err(|err| Error::block_failed(0, Default::default(), err))?;
insert_genesis_hashes(&provider, genesis_state.iter())
.map_err(|err| Error::block_failed(0, err))?;
.map_err(|err| Error::block_failed(0, Default::default(), err))?;
insert_genesis_history(&provider, genesis_state.iter())
.map_err(|err| Error::block_failed(0, err))?;
.map_err(|err| Error::block_failed(0, Default::default(), err))?;
// Decode blocks
let blocks = decode_blocks(&case.blocks)?;
@@ -233,16 +247,18 @@ fn run_case(case: &BlockchainTest) -> Result<(), Error> {
// Insert the block into the database
provider
.insert_block(block.clone())
.map_err(|err| Error::block_failed(block_number, err))?;
.map_err(|err| Error::block_failed(block_number, Default::default(), err))?;
// Commit static files, so we can query the headers for stateless execution below
provider
.static_file_provider()
.commit()
.map_err(|err| Error::block_failed(block_number, err))?;
.map_err(|err| Error::block_failed(block_number, Default::default(), err))?;
// Consensus checks before block execution
pre_execution_checks(chain_spec.clone(), &parent, block)
.map_err(|err| Error::block_failed(block_number, err))?;
pre_execution_checks(chain_spec.clone(), &parent, block).map_err(|err| {
program_inputs.push((block.clone(), execution_witness_with_parent(&parent)));
Error::block_failed(block_number, program_inputs.clone(), err)
})?;
let mut witness_record = ExecutionWitnessRecord::default();
@@ -252,12 +268,13 @@ fn run_case(case: &BlockchainTest) -> Result<(), Error> {
let executor = executor_provider.batch_executor(state_db);
let output = executor
.execute_with_state_closure(&(*block).clone(), |statedb: &State<_>| {
.execute_with_state_closure_always(&(*block).clone(), |statedb: &State<_>| {
witness_record.record_executed_state(statedb);
})
.map_err(|err| Error::block_failed(block_number, err))?;
.map_err(|err| Error::block_failed(block_number, program_inputs.clone(), err))?;
// Consensus checks after block execution
validate_block_post_execution(
block,
&chain_spec,
@@ -265,7 +282,7 @@ fn run_case(case: &BlockchainTest) -> Result<(), Error> {
&output.requests,
&output.block_access_list,
)
.map_err(|err| Error::block_failed(block_number, err))?;
.map_err(|err| Error::block_failed(block_number, program_inputs.clone(), err))?;
// Generate the stateless witness
// TODO: Most of this code is copy-pasted from debug_executionWitness
@@ -299,25 +316,26 @@ fn run_case(case: &BlockchainTest) -> Result<(), Error> {
HashedPostState::from_bundle_state::<KeccakKeyHasher>(output.state.state());
let (computed_state_root, _) =
StateRoot::overlay_root_with_updates(provider.tx_ref(), hashed_state.clone())
.map_err(|err| Error::block_failed(block_number, err))?;
.map_err(|err| Error::block_failed(block_number, program_inputs.clone(), err))?;
if computed_state_root != block.state_root {
return Err(Error::block_failed(
block_number,
program_inputs.clone(),
Error::Assertion("state root mismatch".to_string()),
))
));
}
// Commit the post state/state diff to the database
provider
.write_state(&ExecutionOutcome::single(block.number, output), OriginalValuesKnown::Yes)
.map_err(|err| Error::block_failed(block_number, err))?;
.map_err(|err| Error::block_failed(block_number, program_inputs.clone(), err))?;
provider
.write_hashed_state(&hashed_state.into_sorted())
.map_err(|err| Error::block_failed(block_number, err))?;
.map_err(|err| Error::block_failed(block_number, program_inputs.clone(), err))?;
provider
.update_history_indices(block.number..=block.number)
.map_err(|err| Error::block_failed(block_number, err))?;
.map_err(|err| Error::block_failed(block_number, program_inputs.clone(), err))?;
// Since there were no errors, update the parent block
parent = block.clone()
@@ -345,17 +363,17 @@ fn run_case(case: &BlockchainTest) -> Result<(), Error> {
}
// Now validate using the stateless client if everything else passes
for (block, execution_witness) in program_inputs {
for (block, execution_witness) in &program_inputs {
stateless_validation(
block,
execution_witness,
block.clone(),
execution_witness.clone(),
chain_spec.clone(),
EthEvmConfig::new(chain_spec.clone()),
)
.expect("stateless validation failed");
}
Ok(())
Ok(program_inputs)
}
fn decode_blocks(
@@ -368,10 +386,12 @@ fn decode_blocks(
let block_number = (block_index + 1) as u64;
let decoded = SealedBlock::<Block>::decode(&mut block.rlp.as_ref())
.map_err(|err| Error::block_failed(block_number, err))?;
.map_err(|err| Error::block_failed(block_number, Default::default(), err))?;
let recovered_block =
decoded.clone().try_recover().map_err(|err| Error::block_failed(block_number, err))?;
let recovered_block = decoded
.clone()
.try_recover()
.map_err(|err| Error::block_failed(block_number, Default::default(), err))?;
blocks.push(recovered_block);
}
@@ -466,3 +486,9 @@ fn path_contains(path_str: &str, rhs: &[&str]) -> bool {
let rhs = rhs.join(std::path::MAIN_SEPARATOR_STR);
path_str.contains(&rhs)
}
fn execution_witness_with_parent(parent: &RecoveredBlock<Block>) -> ExecutionWitness {
let mut serialized_header = Vec::new();
parent.header().encode(&mut serialized_header);
ExecutionWitness { headers: vec![serialized_header.into()], ..Default::default() }
}

View File

@@ -2,7 +2,10 @@
use crate::Case;
use reth_db::DatabaseError;
use reth_ethereum_primitives::Block;
use reth_primitives_traits::RecoveredBlock;
use reth_provider::ProviderError;
use reth_stateless::ExecutionWitness;
use std::path::{Path, PathBuf};
use thiserror::Error;
@@ -24,6 +27,9 @@ pub enum Error {
BlockProcessingFailed {
/// The block number for the block that failed
block_number: u64,
/// Contains the inputs necessary for the block stateless validation guest program used in
/// zkVMs to prove the block is invalid.
partial_program_inputs: Vec<(RecoveredBlock<Block>, ExecutionWitness)>,
/// The specific error
#[source]
err: Box<dyn std::error::Error + Send + Sync>,
@@ -67,9 +73,10 @@ impl Error {
/// Create a new [`Error::BlockProcessingFailed`] error.
pub fn block_failed(
block_number: u64,
partial_program_inputs: Vec<(RecoveredBlock<Block>, ExecutionWitness)>,
err: impl std::error::Error + Send + Sync + 'static,
) -> Self {
Self::BlockProcessingFailed { block_number, err: Box::new(err) }
Self::BlockProcessingFailed { block_number, partial_program_inputs, err: Box::new(err) }
}
}