diff --git a/crates/primitives/src/block.rs b/crates/primitives/src/block.rs index 730155361d..8489871fe8 100644 --- a/crates/primitives/src/block.rs +++ b/crates/primitives/src/block.rs @@ -40,6 +40,13 @@ impl Block { withdrawals: self.withdrawals, } } + + /// Transform into a [`BlockWithSenders`]. + pub fn with_senders(self, senders: Vec
) -> BlockWithSenders { + assert_eq!(self.body.len(), senders.len(), "Unequal number of senders"); + + BlockWithSenders { block: self, senders } + } } impl Deref for Block { @@ -49,6 +56,41 @@ impl Deref for Block { } } +/// Sealed block with senders recovered from transactions. +#[derive(Debug, Clone, PartialEq, Eq, Default)] +pub struct BlockWithSenders { + /// Block + pub block: Block, + /// List of senders that match the transactions in the block + pub senders: Vec
, +} + +impl BlockWithSenders { + /// New block with senders. Return none if len of tx and senders does not match + pub fn new(block: Block, senders: Vec
) -> Option { + (!block.body.len() != senders.len()).then_some(Self { block, senders }) + } + + /// Split Structure to its components + pub fn into_components(self) -> (Block, Vec
) { + (self.block, self.senders) + } +} + +impl Deref for BlockWithSenders { + type Target = Block; + fn deref(&self) -> &Self::Target { + &self.block + } +} + +#[cfg(any(test, feature = "test-utils"))] +impl std::ops::DerefMut for BlockWithSenders { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.block + } +} + /// Sealed Ethereum full block. /// /// Withdrawals can be optionally included at the end of the RLP encoded message. diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs index 81ad1189e3..7126e71d8c 100644 --- a/crates/primitives/src/lib.rs +++ b/crates/primitives/src/lib.rs @@ -39,8 +39,8 @@ pub mod proofs; pub use account::{Account, Bytecode}; pub use bits::H512; pub use block::{ - Block, BlockBody, BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag, SealedBlock, - SealedBlockWithSenders, + Block, BlockBody, BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag, BlockWithSenders, + SealedBlock, SealedBlockWithSenders, }; pub use bloom::Bloom; pub use chain::{ diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index 341ec40624..1182e1286c 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -10,9 +10,8 @@ use reth_db::{ tables, transaction::{DbTx, DbTxMut}, }; -use reth_interfaces::provider::ProviderError; use reth_metrics_derive::Metrics; -use reth_primitives::{Address, Block, U256}; +use reth_primitives::{Address, Block, BlockNumber, BlockWithSenders, U256}; use reth_provider::{ post_state::PostState, BlockExecutor, ExecutorFactory, LatestStateProviderRef, Transaction, }; @@ -85,6 +84,27 @@ impl ExecutionStage { } } + // TODO: This should be in the block provider trait once we consolidate + // SharedDatabase/Transaction + fn read_block_with_senders( + tx: &Transaction<'_, DB>, + block_number: BlockNumber, + ) -> Result<(BlockWithSenders, U256), StageError> { + let header = tx.get_header(block_number)?; + let td = tx.get_td(block_number)?; + let ommers = tx.get::(block_number)?.unwrap_or_default().ommers; + let withdrawals = tx.get::(block_number)?.map(|v| v.withdrawals); + + let (transactions, senders): (Vec<_>, Vec<_>) = tx + .get_block_transaction_range(block_number..=block_number)? + .into_iter() + .flat_map(|(_, txs)| txs.into_iter()) + .map(|tx| tx.to_components()) + .unzip(); + + Ok((Block { header, body: transactions, ommers, withdrawals }.with_senders(senders), td)) + } + /// Execute the stage. pub fn execute_inner( &self, @@ -95,83 +115,19 @@ impl ExecutionStage { exec_or_return!(input, self.commit_threshold, "sync::stages::execution"); let last_block = input.stage_progress.unwrap_or_default(); - // Get header with canonical hashes. - let mut headers_cursor = tx.cursor_read::()?; - // Get total difficulty - let mut td_cursor = tx.cursor_read::()?; - // Get bodies with canonical hashes. - let mut bodies_cursor = tx.cursor_read::()?; - // Get ommers with canonical hashes. - let mut ommers_cursor = tx.cursor_read::()?; - // Get block withdrawals. - let mut withdrawals_cursor = tx.cursor_read::()?; - // Get transaction of the block that we are executing. - let mut tx_cursor = tx.cursor_read::()?; - // Skip sender recovery and load signer from database. - let mut tx_sender = tx.cursor_read::()?; - // Get block headers and bodies - let block_batch = headers_cursor - .walk_range(start_block..=end_block)? - .map(|entry| -> Result<_, StageError> { - let (number, header) = entry?; - let (_, td) = td_cursor - .seek_exact(number)? - .ok_or(ProviderError::TotalDifficulty { number })?; - let (_, body) = - bodies_cursor.seek_exact(number)?.ok_or(ProviderError::BlockBody { number })?; - let (_, stored_ommers) = ommers_cursor.seek_exact(number)?.unwrap_or_default(); - let withdrawals = - withdrawals_cursor.seek_exact(number)?.map(|(_, w)| w.withdrawals); - Ok((header, td.into(), body, stored_ommers.ommers, withdrawals)) - }) - .collect::, _>>()?; - // Create state provider with cached state - let mut executor = self.executor_factory.with_sp(LatestStateProviderRef::new(&**tx)); // Fetch transactions, execute them and generate results let mut state = PostState::default(); - for (header, td, body, ommers, withdrawals) in block_batch.into_iter() { - let block_number = header.number; - tracing::trace!(target: "sync::stages::execution", ?block_number, "Execute block."); - - // iterate over all transactions - let mut tx_walker = tx_cursor.walk(Some(body.start_tx_id))?; - let mut transactions = Vec::with_capacity(body.tx_count as usize); - // get next N transactions. - for index in body.tx_id_range() { - let (tx_index, tx) = - tx_walker.next().ok_or(ProviderError::EndOfTransactionTable)??; - if tx_index != index { - error!(target: "sync::stages::execution", block = block_number, expected = index, found = tx_index, ?body, "Transaction gap"); - return Err(ProviderError::TransactionsGap { missing: tx_index }.into()) - } - transactions.push(tx); - } - - // take signers - let mut tx_sender_walker = tx_sender.walk(Some(body.start_tx_id))?; - let mut signers = Vec::with_capacity(body.tx_count as usize); - for index in body.tx_id_range() { - let (tx_index, tx) = - tx_sender_walker.next().ok_or(ProviderError::EndOfTransactionSenderTable)??; - if tx_index != index { - error!(target: "sync::stages::execution", block = block_number, expected = index, found = tx_index, ?body, "Signer gap"); - return Err(ProviderError::TransactionsSignerGap { missing: tx_index }.into()) - } - signers.push(tx); - } - - trace!(target: "sync::stages::execution", number = block_number, txs = transactions.len(), "Executing block"); + for block_number in start_block..=end_block { + let (block, td) = Self::read_block_with_senders(tx, block_number)?; // Configure the executor to use the current state. + trace!(target: "sync::stages::execution", number = block_number, txs = block.body.len(), "Executing block"); + let (block, senders) = block.into_components(); let block_state = executor - .execute_and_verify_receipt( - &Block { header, body: transactions, ommers, withdrawals }, - td, - Some(signers), - ) + .execute_and_verify_receipt(&block, td, Some(senders)) .map_err(|error| StageError::ExecutionError { block: block_number, error })?; if let Some(last_receipt) = block_state.receipts().last() { self.metrics