feat: add tx_number consistency check to StaticFileProviderRW (#12570)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Arsenii Kulikov
2024-11-15 23:04:35 +04:00
committed by GitHub
parent 6e00e58426
commit f0b8b9b221
4 changed files with 25 additions and 46 deletions

View File

@@ -1,5 +1,4 @@
use crate::PipelineEvent;
use alloy_primitives::TxNumber;
use reth_consensus::ConsensusError;
use reth_errors::{BlockExecutionError, DatabaseError, RethError};
use reth_network_p2p::error::DownloadError;
@@ -100,18 +99,6 @@ pub enum StageError {
/// Static File segment
segment: StaticFileSegment,
},
/// Unrecoverable inconsistency error related to a transaction number in a static file segment.
#[error(
"inconsistent transaction number for {segment}. db: {database}, static_file: {static_file}"
)]
InconsistentTxNumber {
/// Static File segment where this error was encountered.
segment: StaticFileSegment,
/// Expected database transaction number.
database: TxNumber,
/// Expected static file transaction number.
static_file: TxNumber,
},
/// The prune checkpoint for the given segment is missing.
#[error("missing prune checkpoint for {0}")]
MissingPruneCheckpoint(PruneSegment),
@@ -146,7 +133,6 @@ impl StageError {
Self::MissingDownloadBuffer |
Self::MissingSyncGap |
Self::ChannelClosed |
Self::InconsistentTxNumber { .. } |
Self::Internal(_) |
Self::Fatal(_)
)

View File

@@ -183,18 +183,7 @@ where
BlockResponse::Full(block) => {
// Write transactions
for transaction in block.body.transactions() {
let appended_tx_number =
static_file_producer.append_transaction(next_tx_num, transaction)?;
if appended_tx_number != next_tx_num {
// This scenario indicates a critical error in the logic of adding new
// items. It should be treated as an `expect()` failure.
return Err(StageError::InconsistentTxNumber {
segment: StaticFileSegment::Transactions,
database: next_tx_num,
static_file: appended_tx_number,
})
}
static_file_producer.append_transaction(next_tx_num, transaction)?;
// Increment transaction id for each transaction.
next_tx_num += 1;

View File

@@ -133,6 +133,9 @@ pub enum ProviderError {
/// Trying to insert data from an unexpected block number.
#[display("trying to append data to {_0} as block #{_1} but expected block #{_2}")]
UnexpectedStaticFileBlockNumber(StaticFileSegment, BlockNumber, BlockNumber),
/// Trying to insert data from an unexpected block number.
#[display("trying to append row to {_0} at index #{_1} but expected index #{_2}")]
UnexpectedStaticFileTxNumber(StaticFileSegment, TxNumber, TxNumber),
/// Static File Provider was initialized as read-only.
#[display("cannot get a writer on a read-only environment.")]
ReadOnlyStaticFileAccess,

View File

@@ -498,16 +498,24 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
&mut self,
tx_num: TxNumber,
value: V,
) -> ProviderResult<TxNumber> {
if self.writer.user_header().tx_range().is_none() {
self.writer.user_header_mut().set_tx_range(tx_num, tx_num);
} else {
) -> ProviderResult<()> {
if let Some(range) = self.writer.user_header().tx_range() {
let next_tx = range.end() + 1;
if next_tx != tx_num {
return Err(ProviderError::UnexpectedStaticFileTxNumber(
self.writer.user_header().segment(),
tx_num,
next_tx,
))
}
self.writer.user_header_mut().increment_tx();
} else {
self.writer.user_header_mut().set_tx_range(tx_num, tx_num);
}
self.append_column(value)?;
Ok(self.writer.user_header().tx_end().expect("qed"))
Ok(())
}
/// Appends header to static file.
@@ -550,16 +558,12 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
/// empty blocks and this function wouldn't be called.
///
/// Returns the current [`TxNumber`] as seen in the static file.
pub fn append_transaction(
&mut self,
tx_num: TxNumber,
tx: impl Compact,
) -> ProviderResult<TxNumber> {
pub fn append_transaction(&mut self, tx_num: TxNumber, tx: impl Compact) -> ProviderResult<()> {
let start = Instant::now();
self.ensure_no_queued_prune()?;
debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
let result = self.append_with_tx_number(tx_num, tx)?;
self.append_with_tx_number(tx_num, tx)?;
if let Some(metrics) = &self.metrics {
metrics.record_segment_operation(
@@ -569,7 +573,7 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
);
}
Ok(result)
Ok(())
}
/// Appends receipt to static file.
@@ -578,16 +582,12 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
/// empty blocks and this function wouldn't be called.
///
/// Returns the current [`TxNumber`] as seen in the static file.
pub fn append_receipt(
&mut self,
tx_num: TxNumber,
receipt: &Receipt,
) -> ProviderResult<TxNumber> {
pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &Receipt) -> ProviderResult<()> {
let start = Instant::now();
self.ensure_no_queued_prune()?;
debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
let result = self.append_with_tx_number(tx_num, receipt)?;
self.append_with_tx_number(tx_num, receipt)?;
if let Some(metrics) = &self.metrics {
metrics.record_segment_operation(
@@ -597,7 +597,7 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
);
}
Ok(result)
Ok(())
}
/// Appends multiple receipts to the static file.
@@ -625,7 +625,8 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
for receipt_result in receipts_iter {
let (tx_num, receipt) = receipt_result?;
tx_number = self.append_with_tx_number(tx_num, receipt.borrow())?;
self.append_with_tx_number(tx_num, receipt.borrow())?;
tx_number = tx_num;
count += 1;
}