diff --git a/crates/ethereum/evm/src/execute.rs b/crates/ethereum/evm/src/execute.rs index d0d3414cb6..52c8bd5c31 100644 --- a/crates/ethereum/evm/src/execute.rs +++ b/crates/ethereum/evm/src/execute.rs @@ -20,7 +20,8 @@ use reth_evm::{ }; use reth_execution_types::ExecutionOutcome; use reth_primitives::{ - BlockNumber, BlockWithSenders, EthereumHardfork, Header, Receipt, Request, U256, + BlockNumber, BlockWithSenders, EthereumHardfork, Header, Receipt, Receipts, Request, Requests, + U256, }; use reth_prune_types::PruneModes; use reth_revm::{ @@ -416,6 +417,28 @@ where type Output = ExecutionOutcome; type Error = BlockExecutionError; + fn execute( + &mut self, + input: Self::Input<'_>, + ) -> Result<(Vec, Vec), Self::Error> { + let BlockExecutionInput { block, total_difficulty } = input; + + if self.batch_record.first_block().is_none() { + self.batch_record.set_first_block(block.number); + } + + let EthExecuteOutput { receipts, requests, gas_used: _ } = + self.executor.execute_without_verification(block, total_difficulty)?; + + // prepare the state according to the prune mode + let retention = self.batch_record.bundle_retention(block.number); + self.executor.state.merge_transitions(retention); + + // validation should done externally + + Ok((receipts, requests)) + } + fn execute_and_verify_one(&mut self, input: Self::Input<'_>) -> Result<(), Self::Error> { let BlockExecutionInput { block, total_difficulty } = input; diff --git a/crates/evm/src/execute.rs b/crates/evm/src/execute.rs index 2109d557f8..5d30143a6b 100644 --- a/crates/evm/src/execute.rs +++ b/crates/evm/src/execute.rs @@ -7,7 +7,7 @@ pub use reth_storage_errors::provider::ProviderError; use core::fmt::Display; -use reth_primitives::{BlockNumber, BlockWithSenders, Receipt}; +use reth_primitives::{BlockNumber, BlockWithSenders, Receipt, Receipts, Request, Requests}; use reth_prune_types::PruneModes; use revm_primitives::db::Database; @@ -44,6 +44,15 @@ pub trait BatchExecutor { /// The error type returned by the executor. type Error; + /// Executes the next block in the batch and updates the state internally. Returns the + /// unverified output. + fn execute( + &mut self, + input: Self::Input<'_>, + ) -> Result<(Vec, Vec), Self::Error> { + todo!() + } + /// Executes the next block in the batch, verifies the output and updates the state internally. fn execute_and_verify_one(&mut self, input: Self::Input<'_>) -> Result<(), Self::Error>; diff --git a/crates/stages/stages/Cargo.toml b/crates/stages/stages/Cargo.toml index 934e9b1090..ff50b0fcaf 100644 --- a/crates/stages/stages/Cargo.toml +++ b/crates/stages/stages/Cargo.toml @@ -26,6 +26,7 @@ reth-network-p2p.workspace = true reth-primitives = { workspace = true, features = ["secp256k1"] } reth-primitives-traits.workspace = true reth-provider.workspace = true +reth-execution-errors.workspace = true reth-execution-types.workspace = true reth-prune.workspace = true reth-prune-types.workspace = true @@ -34,6 +35,7 @@ reth-revm.workspace = true reth-stages-api.workspace = true reth-trie = { workspace = true, features = ["metrics"] } reth-trie-db = { workspace = true, features = ["metrics"] } +reth-ethereum-consensus.workspace = true reth-testing-utils = { workspace = true, optional = true } diff --git a/crates/stages/stages/src/stages/execution.rs b/crates/stages/stages/src/stages/execution.rs index b3d2122661..279e0b573b 100644 --- a/crates/stages/stages/src/stages/execution.rs +++ b/crates/stages/stages/src/stages/execution.rs @@ -3,10 +3,14 @@ use num_traits::Zero; use reth_config::config::ExecutionConfig; use reth_db::{static_file::HeaderMask, tables}; use reth_db_api::{cursor::DbCursorRO, database::Database, transaction::DbTx}; +use reth_ethereum_consensus::validate_block_post_execution; use reth_evm::execute::{BatchExecutor, BlockExecutorProvider}; +use reth_execution_errors::BlockExecutionError; use reth_execution_types::{Chain, ExecutionOutcome}; use reth_exex::{ExExManagerHandle, ExExNotification}; -use reth_primitives::{BlockNumber, Header, StaticFileSegment}; +use reth_primitives::{ + BlockNumber, BlockWithSenders, Header, Receipt, Receipts, Request, Requests, StaticFileSegment, +}; use reth_primitives_traits::format_gas_throughput; use reth_provider::{ providers::{StaticFileProvider, StaticFileProviderRWRefMut, StaticFileWriter}, @@ -14,6 +18,7 @@ use reth_provider::{ BlockReader, DatabaseProviderRW, HeaderProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError, StateWriter, StatsReader, TransactionVariant, }; +use reth_prune::{PruneMode, MINIMUM_PRUNING_DISTANCE}; use reth_prune_types::PruneModes; use reth_revm::database::StateProviderDatabase; use reth_stages_api::{ @@ -23,11 +28,13 @@ use reth_stages_api::{ }; use std::{ cmp::Ordering, + collections::HashSet, ops::RangeInclusive, sync::Arc, task::{ready, Context, Poll}, time::{Duration, Instant}, }; +use tokio::sync::oneshot::error::TryRecvError; use tracing::*; /// The execution stage executes all transactions and @@ -226,7 +233,7 @@ where )); let mut executor = self.executor_provider.batch_executor(db); executor.set_tip(max_block); - executor.set_prune_modes(prune_modes); + executor.set_prune_modes(prune_modes.clone()); // Progress tracking let mut stage_progress = start_block; @@ -248,6 +255,41 @@ where let mut cumulative_gas = 0; let batch_start = Instant::now(); + let (receipts_tx, mut receipts_rx) = tokio::sync::oneshot::channel(); + let (validation_tx, mut validation_rx) = tokio::sync::mpsc::unbounded_channel::<( + Arc, + Vec, + Vec, + PruneModes, + )>(); + let chainspec = provider.chain_spec().clone(); + + tokio::task::spawn(async move { + let mut all_requests: Vec = vec![]; + let mut all_receipts = + Receipts { receipt_vec: Vec::with_capacity((max_block - start_block) as usize) }; + + while let Some((block, receipts, requests, _)) = validation_rx.recv().await { + if let Err(err) = validate_block_post_execution( + &block, + &chainspec, + receipts.as_slice(), + requests.as_slice(), + ) { + let _ = receipts_tx.send(Err(StageError::Block { + block: Box::new(block.header.clone().seal_slow()), + error: BlockErrorKind::Execution(BlockExecutionError::Consensus(err)), + })); + return + } + // TODO: pruner + all_receipts.push(receipts.into_iter().map(Some).collect()); + all_requests.push(requests.into()) + } + + let _ = receipts_tx.send(Ok((all_receipts, all_requests))); + }); + let mut blocks = Vec::new(); for block_number in start_block..=max_block { // Fetch the block @@ -258,9 +300,11 @@ where .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?; // we need the block's transactions but we don't need the transaction hashes - let block = provider - .block_with_senders(block_number.into(), TransactionVariant::NoHash)? - .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?; + let block = Arc::new( + provider + .block_with_senders(block_number.into(), TransactionVariant::NoHash)? + .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?, + ); fetch_block_duration += fetch_block_start.elapsed(); @@ -272,12 +316,15 @@ where // Execute the block let execute_start = Instant::now(); - executor.execute_and_verify_one((&block, td).into()).map_err(|error| { - StageError::Block { - block: Box::new(block.header.clone().seal_slow()), - error: BlockErrorKind::Execution(error), - } - })?; + let (receipts, requests) = + executor.execute((block.as_ref(), td).into()).map_err(|error| { + StageError::Block { + block: Box::new(block.header.clone().seal_slow()), + error: BlockErrorKind::Execution(error), + } + })?; + let _ = validation_tx.send((block.clone(), receipts, requests, prune_modes.clone())); + execution_duration += execute_start.elapsed(); // Log execution throughput @@ -320,11 +367,40 @@ where ) { break } + + // Check if previous block validations have failed + if let Ok(err) = receipts_rx.try_recv() { + // If we have received a message inside the loop, it means that a previous block + // failed its validation. + err?; + unreachable!(); + } } + // By dropping tx, the validation background task will end by sending all accumulated + // receipts and requests + drop(validation_tx); + let (receipts, requests) = loop { + match receipts_rx.try_recv() { + Ok(result) => { + break result?; + } + Err(TryRecvError::Empty) => { + // sleep for a bit to avoid busy-waiting + std::thread::sleep(Duration::from_millis(10)); + continue + } + _ => unreachable!(), + } + }; + debug!( + target: "sync::stages::execution", + "Received validated receipts and requests" + ); + // prepare execution output for writing let time = Instant::now(); - let ExecutionOutcome { bundle, receipts, requests, first_block } = executor.finalize(); + let ExecutionOutcome { bundle, first_block, .. } = executor.finalize(); let state = ExecutionOutcome::new(bundle, receipts, first_block, requests); let write_preparation_duration = time.elapsed(); @@ -344,7 +420,7 @@ where if !blocks.is_empty() { let blocks = blocks.into_iter().map(|block| { let hash = block.header.hash_slow(); - block.seal(hash) + Arc::try_unwrap(block).expect("validation task has ended").seal(hash) }); let previous_input =