From c0fb169da497aff4e44a234fa041c1bba484751d Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Tue, 6 Jun 2023 16:17:12 -0400 Subject: [PATCH] fix: unwind on execution and senders errors (#2938) Co-authored-by: Roman Krasiuk --- crates/blockchain-tree/src/blockchain_tree.rs | 20 +++---- crates/consensus/beacon/src/engine/mod.rs | 18 ++++--- .../interfaces/src/blockchain_tree/error.rs | 12 ++++- crates/interfaces/src/executor.rs | 13 +++-- crates/interfaces/src/provider.rs | 3 ++ crates/revm/src/executor.rs | 30 ++++++----- crates/stages/src/pipeline/mod.rs | 20 +++++++ crates/stages/src/stages/sender_recovery.rs | 54 ++++++++++++++----- crates/storage/provider/src/transaction.rs | 11 +++- 9 files changed, 133 insertions(+), 48 deletions(-) diff --git a/crates/blockchain-tree/src/blockchain_tree.rs b/crates/blockchain-tree/src/blockchain_tree.rs index f301b7f710..f3a428f006 100644 --- a/crates/blockchain-tree/src/blockchain_tree.rs +++ b/crates/blockchain-tree/src/blockchain_tree.rs @@ -11,7 +11,7 @@ use reth_interfaces::{ BlockStatus, CanonicalOutcome, }, consensus::{Consensus, ConsensusError}, - executor::BlockExecutionError, + executor::{BlockExecutionError, BlockValidationError}, Error, }; use reth_primitives::{ @@ -378,7 +378,7 @@ impl BlockchainTree if !self.externals.chain_spec.fork(Hardfork::Paris).active_at_ttd(parent_td, U256::ZERO) { return Err(InsertBlockError::execution_error( - BlockExecutionError::BlockPreMerge { hash: block.hash }, + BlockValidationError::BlockPreMerge { hash: block.hash }.into(), block.block, )) } @@ -869,14 +869,16 @@ impl BlockchainTree // If block is already canonical don't return error. if let Some(header) = self.find_canonical_header(block_hash)? { info!(target: "blockchain_tree", ?block_hash, "Block is already canonical, ignoring."); - let td = self - .externals - .database() - .provider()? - .header_td(block_hash)? - .ok_or(BlockExecutionError::MissingTotalDifficulty { hash: *block_hash })?; + let td = self.externals.database().provider()?.header_td(block_hash)?.ok_or( + BlockExecutionError::from(BlockValidationError::MissingTotalDifficulty { + hash: *block_hash, + }), + )?; if !self.externals.chain_spec.fork(Hardfork::Paris).active_at_ttd(td, U256::ZERO) { - return Err(BlockExecutionError::BlockPreMerge { hash: *block_hash }.into()) + return Err(BlockExecutionError::from(BlockValidationError::BlockPreMerge { + hash: *block_hash, + }) + .into()) } return Ok(CanonicalOutcome::AlreadyCanonical { header }) } diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index e0166d91d9..aea1c8bf66 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -10,7 +10,7 @@ use reth_interfaces::{ BlockStatus, BlockchainTreeEngine, }, consensus::ForkchoiceState, - executor::BlockExecutionError, + executor::{BlockExecutionError, BlockValidationError}, p2p::{bodies::client::BodiesClient, headers::client::HeadersClient}, sync::{NetworkSyncUpdater, SyncState}, Error, @@ -642,7 +642,11 @@ where #[allow(clippy::single_match)] match &error { - Error::Execution(error @ BlockExecutionError::BlockPreMerge { .. }) => { + Error::Execution( + error @ BlockExecutionError::Validation(BlockValidationError::BlockPreMerge { + .. + }), + ) => { return PayloadStatus::from_status(PayloadStatusEnum::Invalid { validation_error: error.to_string(), }) @@ -1793,7 +1797,7 @@ mod tests { }) .await; let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Invalid { - validation_error: BlockExecutionError::BlockPreMerge { hash: block1.hash } + validation_error: BlockValidationError::BlockPreMerge { hash: block1.hash } .to_string(), }) .with_latest_valid_hash(H256::zero()); @@ -1803,9 +1807,7 @@ mod tests { mod new_payload { use super::*; - use reth_interfaces::{ - executor::BlockExecutionError, test_utils::generators::random_block, - }; + use reth_interfaces::test_utils::generators::random_block; use reth_primitives::{Hardfork, U256}; use reth_provider::test_utils::blocks::BlockChainTestData; @@ -1977,7 +1979,7 @@ mod tests { .await; let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Invalid { - validation_error: BlockExecutionError::BlockPreMerge { hash: block1.hash } + validation_error: BlockValidationError::BlockPreMerge { hash: block1.hash } .to_string(), }) .with_latest_valid_hash(H256::zero()); @@ -1988,7 +1990,7 @@ mod tests { env.send_new_payload_retry_on_syncing(block2.clone().into()).await.unwrap(); let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Invalid { - validation_error: BlockExecutionError::BlockPreMerge { hash: block2.hash } + validation_error: BlockValidationError::BlockPreMerge { hash: block2.hash } .to_string(), }) .with_latest_valid_hash(H256::zero()); diff --git a/crates/interfaces/src/blockchain_tree/error.rs b/crates/interfaces/src/blockchain_tree/error.rs index 36012996b6..e9bee5105e 100644 --- a/crates/interfaces/src/blockchain_tree/error.rs +++ b/crates/interfaces/src/blockchain_tree/error.rs @@ -1,6 +1,9 @@ //! Error handling for the blockchain tree -use crate::{consensus::ConsensusError, executor::BlockExecutionError}; +use crate::{ + consensus::ConsensusError, + executor::{BlockExecutionError, BlockValidationError}, +}; use reth_primitives::{BlockHash, BlockNumber, SealedBlock}; /// Various error cases that can occur when a block violates tree assumptions. @@ -173,7 +176,12 @@ impl InsertBlockErrorKind { /// Returns true if this is a block pre merge error. pub fn is_block_pre_merge(&self) -> bool { - matches!(self, InsertBlockErrorKind::Execution(BlockExecutionError::BlockPreMerge { .. })) + matches!( + self, + InsertBlockErrorKind::Execution(BlockExecutionError::Validation( + BlockValidationError::BlockPreMerge { .. } + )) + ) } /// Returns true if the error is an execution error diff --git a/crates/interfaces/src/executor.rs b/crates/interfaces/src/executor.rs index 4a92d88a6f..cc747e29fd 100644 --- a/crates/interfaces/src/executor.rs +++ b/crates/interfaces/src/executor.rs @@ -1,11 +1,10 @@ use reth_primitives::{BlockHash, BlockNumHash, Bloom, H256}; use thiserror::Error; -/// BlockExecutor Errors +/// Transaction validation errors #[allow(missing_docs)] #[derive(Error, Debug, Clone, PartialEq, Eq)] -pub enum BlockExecutionError { - // === validation errors === +pub enum BlockValidationError { #[error("EVM reported invalid transaction ({hash:?}): {message}")] EVM { hash: H256, message: String }, #[error("Failed to recover sender for transaction")] @@ -25,6 +24,14 @@ pub enum BlockExecutionError { BlockPreMerge { hash: H256 }, #[error("Missing total difficulty")] MissingTotalDifficulty { hash: H256 }, +} + +/// BlockExecutor Errors +#[allow(missing_docs)] +#[derive(Error, Debug, Clone, PartialEq, Eq)] +pub enum BlockExecutionError { + #[error(transparent)] + Validation(#[from] BlockValidationError), // === misc provider error === #[error("Provider error")] diff --git a/crates/interfaces/src/provider.rs b/crates/interfaces/src/provider.rs index e482819c04..c8ca9f1d86 100644 --- a/crates/interfaces/src/provider.rs +++ b/crates/interfaces/src/provider.rs @@ -62,4 +62,7 @@ pub enum ProviderError { /// Unable to compute state root on top of historical block #[error("Unable to compute state root on top of historical block")] StateRootNotAvailableForHistoricalBlock, + /// Unable to find the block number for a given transaction index + #[error("Unable to find the block number for a given transaction index")] + BlockNumberForTransactionIndexNotFound, } diff --git a/crates/revm/src/executor.rs b/crates/revm/src/executor.rs index 7a078569ef..fb5d338bdb 100644 --- a/crates/revm/src/executor.rs +++ b/crates/revm/src/executor.rs @@ -7,7 +7,7 @@ use crate::{ to_reth_acc, }; use reth_consensus_common::calc; -use reth_interfaces::executor::BlockExecutionError; +use reth_interfaces::executor::{BlockExecutionError, BlockValidationError}; use reth_primitives::{ Account, Address, Block, BlockNumber, Bloom, Bytecode, ChainSpec, Hardfork, Header, Receipt, ReceiptWithBloom, TransactionSigned, Withdrawal, H256, U256, @@ -81,11 +81,13 @@ where if body.len() == senders.len() { Ok(senders) } else { - Err(BlockExecutionError::SenderRecoveryError) + Err(BlockValidationError::SenderRecoveryError.into()) } } else { body.iter() - .map(|tx| tx.recover_signer().ok_or(BlockExecutionError::SenderRecoveryError)) + .map(|tx| { + tx.recover_signer().ok_or(BlockValidationError::SenderRecoveryError.into()) + }) .collect() } } @@ -198,7 +200,7 @@ where // main execution. self.evm.transact() }; - out.map_err(|e| BlockExecutionError::EVM { hash, message: format!("{e:?}") }) + out.map_err(|e| BlockValidationError::EVM { hash, message: format!("{e:?}") }.into()) } /// Runs the provided transactions and commits their state to the run-time database. @@ -232,10 +234,11 @@ where // must be no greater than the block’s gasLimit. let block_available_gas = block.header.gas_limit - cumulative_gas_used; if transaction.gas_limit() > block_available_gas { - return Err(BlockExecutionError::TransactionGasLimitMoreThanAvailableBlockGas { + return Err(BlockValidationError::TransactionGasLimitMoreThanAvailableBlockGas { transaction_gas_limit: transaction.gas_limit(), block_available_gas, - }) + } + .into()) } // Execute transaction. let ResultAndState { result, state } = self.transact(transaction, sender)?; @@ -285,10 +288,11 @@ where // Check if gas used matches the value set in header. if block.gas_used != cumulative_gas_used { - return Err(BlockExecutionError::BlockGasUsed { + return Err(BlockValidationError::BlockGasUsed { got: cumulative_gas_used, expected: block.gas_used, - }) + } + .into()) } // Add block rewards @@ -525,19 +529,21 @@ pub fn verify_receipt<'a>( let receipts_with_bloom = receipts.map(|r| r.clone().into()).collect::>(); let receipts_root = reth_primitives::proofs::calculate_receipt_root(&receipts_with_bloom); if receipts_root != expected_receipts_root { - return Err(BlockExecutionError::ReceiptRootDiff { + return Err(BlockValidationError::ReceiptRootDiff { got: receipts_root, expected: expected_receipts_root, - }) + } + .into()) } // Create header log bloom. let logs_bloom = receipts_with_bloom.iter().fold(Bloom::zero(), |bloom, r| bloom | r.bloom); if logs_bloom != expected_logs_bloom { - return Err(BlockExecutionError::BloomLogDiff { + return Err(BlockValidationError::BloomLogDiff { expected: Box::new(expected_logs_bloom), got: Box::new(logs_bloom), - }) + } + .into()) } Ok(()) diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index 27502f4c63..c1a48dda13 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -1,6 +1,7 @@ use crate::{error::*, ExecInput, ExecOutput, Stage, StageError, UnwindInput}; use futures_util::Future; use reth_db::database::Database; +use reth_interfaces::executor::BlockExecutionError; use reth_primitives::{listener::EventListeners, stage::StageId, BlockNumber, H256}; use reth_provider::{providers::get_stage_checkpoint, Transaction}; use std::pin::Pin; @@ -390,6 +391,25 @@ where target: prev_checkpoint.unwrap_or_default().block_number, bad_block: block, }) + } else if let StageError::ExecutionError { + block, + error: BlockExecutionError::Validation(error), + } = err + { + warn!( + target: "sync::pipeline", + stage = %stage_id, + bad_block = %block.number, + "Stage encountered an execution error: {error}" + ); + + // We unwind because of an execution error. If the unwind itself fails, we + // bail entirely, otherwise we restart the execution loop from the + // beginning. + Ok(ControlFlow::Unwind { + target: prev_checkpoint.unwrap_or_default().block_number, + bad_block: block, + }) } else if err.is_fatal() { error!( target: "sync::pipeline", diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index d08627c07c..3cc0f12153 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -7,12 +7,13 @@ use reth_db::{ transaction::{DbTx, DbTxMut}, DatabaseError, RawKey, RawTable, RawValue, }; +use reth_interfaces::consensus; use reth_primitives::{ keccak256, stage::{EntitiesCheckpoint, StageCheckpoint, StageId}, TransactionSignedNoHash, TxNumber, H160, }; -use reth_provider::Transaction; +use reth_provider::{ProviderError, Transaction}; use std::{fmt::Debug, ops::Deref}; use thiserror::Error; use tokio::sync::mpsc; @@ -116,15 +117,18 @@ impl Stage for SenderRecoveryStage { DatabaseError, >, rlp_buf: &mut Vec| - -> Result<(u64, H160), Box> { - let (tx_id, transaction) = entry.map_err(|e| Box::new(e.into()))?; + -> Result<(u64, H160), Box> { + let (tx_id, transaction) = + entry.map_err(|e| Box::new(SenderRecoveryStageError::StageError(e.into())))?; let tx_id = tx_id.key().expect("key to be formated"); let tx = transaction.value().expect("value to be formated"); tx.transaction.encode_without_signature(rlp_buf); let sender = tx.signature.recover_signer(keccak256(rlp_buf)).ok_or( - StageError::from(SenderRecoveryStageError::SenderRecovery { tx: tx_id }), + SenderRecoveryStageError::FailedRecovery(FailedSenderRecoveryError { + tx: tx_id, + }), )?; Ok((tx_id, sender)) @@ -144,7 +148,29 @@ impl Stage for SenderRecoveryStage { // Iterate over channels and append the sender in the order that they are received. for mut channel in channels { while let Some(recovered) = channel.recv().await { - let (tx_id, sender) = recovered.map_err(|boxed| *boxed)?; + let (tx_id, sender) = match recovered { + Ok(result) => result, + Err(error) => { + match *error { + SenderRecoveryStageError::FailedRecovery(err) => { + // get the block number for the bad transaction + let block_number = tx + .get::(err.tx)? + .ok_or(ProviderError::BlockNumberForTransactionIndexNotFound)?; + + // fetch the sealed header so we can use it in the sender recovery + // unwind + let sealed_header = tx.get_sealed_header(block_number)?; + return Err(StageError::Validation { + block: sealed_header, + error: + consensus::ConsensusError::TransactionSignerRecoveryError, + }) + } + SenderRecoveryStageError::StageError(err) => return Err(err), + } + } + }; senders_cursor.append(tx_id, sender)?; } } @@ -187,17 +213,21 @@ fn stage_checkpoint( }) } -// TODO(onbjerg): Should unwind #[derive(Error, Debug)] +#[error(transparent)] enum SenderRecoveryStageError { - #[error("Sender recovery failed for transaction {tx}.")] - SenderRecovery { tx: TxNumber }, + /// A transaction failed sender recovery + FailedRecovery(FailedSenderRecoveryError), + + /// A different type of stage error occurred + StageError(#[from] StageError), } -impl From for StageError { - fn from(error: SenderRecoveryStageError) -> Self { - StageError::Fatal(Box::new(error)) - } +#[derive(Error, Debug)] +#[error("Sender recovery failed for transaction {tx}.")] +struct FailedSenderRecoveryError { + /// The transaction that failed sender recovery + tx: TxNumber, } #[cfg(test)] diff --git a/crates/storage/provider/src/transaction.rs b/crates/storage/provider/src/transaction.rs index ffe8534f22..bea656657c 100644 --- a/crates/storage/provider/src/transaction.rs +++ b/crates/storage/provider/src/transaction.rs @@ -23,8 +23,8 @@ use reth_primitives::{ keccak256, stage::{StageCheckpoint, StageId}, Account, Address, BlockHash, BlockNumber, ChainSpec, Hardfork, Header, SealedBlock, - SealedBlockWithSenders, StorageEntry, TransactionSigned, TransactionSignedEcRecovered, H256, - U256, + SealedBlockWithSenders, SealedHeader, StorageEntry, TransactionSigned, + TransactionSignedEcRecovered, H256, U256, }; use reth_trie::{StateRoot, StateRootError}; use std::{ @@ -183,6 +183,13 @@ where Ok(td.into()) } + /// Query the sealed header by number + pub fn get_sealed_header(&self, number: BlockNumber) -> Result { + let header = self.get_header(number)?; + let block_hash = self.get_block_hash(number)?; + Ok(header.seal(block_hash)) + } + /// Unwind table by some number key. /// Returns number of rows unwound. ///