diff --git a/crates/stages/api/src/error.rs b/crates/stages/api/src/error.rs index b12f5186f3..9a4ef35aaf 100644 --- a/crates/stages/api/src/error.rs +++ b/crates/stages/api/src/error.rs @@ -1,5 +1,4 @@ use crate::PipelineEvent; -use alloy_primitives::TxNumber; use reth_consensus::ConsensusError; use reth_errors::{BlockExecutionError, DatabaseError, RethError}; use reth_network_p2p::error::DownloadError; @@ -100,18 +99,6 @@ pub enum StageError { /// Static File segment segment: StaticFileSegment, }, - /// Unrecoverable inconsistency error related to a transaction number in a static file segment. - #[error( - "inconsistent transaction number for {segment}. db: {database}, static_file: {static_file}" - )] - InconsistentTxNumber { - /// Static File segment where this error was encountered. - segment: StaticFileSegment, - /// Expected database transaction number. - database: TxNumber, - /// Expected static file transaction number. - static_file: TxNumber, - }, /// The prune checkpoint for the given segment is missing. #[error("missing prune checkpoint for {0}")] MissingPruneCheckpoint(PruneSegment), @@ -146,7 +133,6 @@ impl StageError { Self::MissingDownloadBuffer | Self::MissingSyncGap | Self::ChannelClosed | - Self::InconsistentTxNumber { .. } | Self::Internal(_) | Self::Fatal(_) ) diff --git a/crates/stages/stages/src/stages/bodies.rs b/crates/stages/stages/src/stages/bodies.rs index 48bc679f5b..07b9757497 100644 --- a/crates/stages/stages/src/stages/bodies.rs +++ b/crates/stages/stages/src/stages/bodies.rs @@ -183,18 +183,7 @@ where BlockResponse::Full(block) => { // Write transactions for transaction in block.body.transactions() { - let appended_tx_number = - static_file_producer.append_transaction(next_tx_num, transaction)?; - - if appended_tx_number != next_tx_num { - // This scenario indicates a critical error in the logic of adding new - // items. It should be treated as an `expect()` failure. - return Err(StageError::InconsistentTxNumber { - segment: StaticFileSegment::Transactions, - database: next_tx_num, - static_file: appended_tx_number, - }) - } + static_file_producer.append_transaction(next_tx_num, transaction)?; // Increment transaction id for each transaction. next_tx_num += 1; diff --git a/crates/storage/errors/src/provider.rs b/crates/storage/errors/src/provider.rs index d60a2adb92..b6fcee545d 100644 --- a/crates/storage/errors/src/provider.rs +++ b/crates/storage/errors/src/provider.rs @@ -133,6 +133,9 @@ pub enum ProviderError { /// Trying to insert data from an unexpected block number. #[display("trying to append data to {_0} as block #{_1} but expected block #{_2}")] UnexpectedStaticFileBlockNumber(StaticFileSegment, BlockNumber, BlockNumber), + /// Trying to insert data from an unexpected block number. + #[display("trying to append row to {_0} at index #{_1} but expected index #{_2}")] + UnexpectedStaticFileTxNumber(StaticFileSegment, TxNumber, TxNumber), /// Static File Provider was initialized as read-only. #[display("cannot get a writer on a read-only environment.")] ReadOnlyStaticFileAccess, diff --git a/crates/storage/provider/src/providers/static_file/writer.rs b/crates/storage/provider/src/providers/static_file/writer.rs index ef01bd773c..5951dbb751 100644 --- a/crates/storage/provider/src/providers/static_file/writer.rs +++ b/crates/storage/provider/src/providers/static_file/writer.rs @@ -498,16 +498,24 @@ impl StaticFileProviderRW { &mut self, tx_num: TxNumber, value: V, - ) -> ProviderResult { - if self.writer.user_header().tx_range().is_none() { - self.writer.user_header_mut().set_tx_range(tx_num, tx_num); - } else { + ) -> ProviderResult<()> { + if let Some(range) = self.writer.user_header().tx_range() { + let next_tx = range.end() + 1; + if next_tx != tx_num { + return Err(ProviderError::UnexpectedStaticFileTxNumber( + self.writer.user_header().segment(), + tx_num, + next_tx, + )) + } self.writer.user_header_mut().increment_tx(); + } else { + self.writer.user_header_mut().set_tx_range(tx_num, tx_num); } self.append_column(value)?; - Ok(self.writer.user_header().tx_end().expect("qed")) + Ok(()) } /// Appends header to static file. @@ -550,16 +558,12 @@ impl StaticFileProviderRW { /// empty blocks and this function wouldn't be called. /// /// Returns the current [`TxNumber`] as seen in the static file. - pub fn append_transaction( - &mut self, - tx_num: TxNumber, - tx: impl Compact, - ) -> ProviderResult { + pub fn append_transaction(&mut self, tx_num: TxNumber, tx: impl Compact) -> ProviderResult<()> { let start = Instant::now(); self.ensure_no_queued_prune()?; debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions); - let result = self.append_with_tx_number(tx_num, tx)?; + self.append_with_tx_number(tx_num, tx)?; if let Some(metrics) = &self.metrics { metrics.record_segment_operation( @@ -569,7 +573,7 @@ impl StaticFileProviderRW { ); } - Ok(result) + Ok(()) } /// Appends receipt to static file. @@ -578,16 +582,12 @@ impl StaticFileProviderRW { /// empty blocks and this function wouldn't be called. /// /// Returns the current [`TxNumber`] as seen in the static file. - pub fn append_receipt( - &mut self, - tx_num: TxNumber, - receipt: &Receipt, - ) -> ProviderResult { + pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &Receipt) -> ProviderResult<()> { let start = Instant::now(); self.ensure_no_queued_prune()?; debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts); - let result = self.append_with_tx_number(tx_num, receipt)?; + self.append_with_tx_number(tx_num, receipt)?; if let Some(metrics) = &self.metrics { metrics.record_segment_operation( @@ -597,7 +597,7 @@ impl StaticFileProviderRW { ); } - Ok(result) + Ok(()) } /// Appends multiple receipts to the static file. @@ -625,7 +625,8 @@ impl StaticFileProviderRW { for receipt_result in receipts_iter { let (tx_num, receipt) = receipt_result?; - tx_number = self.append_with_tx_number(tx_num, receipt.borrow())?; + self.append_with_tx_number(tx_num, receipt.borrow())?; + tx_number = tx_num; count += 1; }