diff --git a/crates/node/builder/src/launch/common.rs b/crates/node/builder/src/launch/common.rs index 29860ae4d2..e385f4a1df 100644 --- a/crates/node/builder/src/launch/common.rs +++ b/crates/node/builder/src/launch/common.rs @@ -66,8 +66,8 @@ use reth_node_metrics::{ }; use reth_provider::{ providers::{NodeTypesForProvider, ProviderNodeTypes, RocksDBProvider, StaticFileProvider}, - BlockHashReader, BlockNumReader, DatabaseProviderFactory, ProviderError, ProviderFactory, - ProviderResult, RocksDBProviderFactory, StageCheckpointReader, StaticFileProviderBuilder, + BlockHashReader, BlockNumReader, ProviderError, ProviderFactory, ProviderResult, + RocksDBProviderFactory, StageCheckpointReader, StaticFileProviderBuilder, StaticFileProviderFactory, }; use reth_prune::{PruneModes, PrunerBuilder}; @@ -507,32 +507,10 @@ where .with_prune_modes(self.prune_modes()) .with_changeset_cache(changeset_cache); - // Keep MDBX, static files, and RocksDB aligned. If any check fails, unwind to the - // earliest consistent block. - // - // Order matters: - // 1) heal static files (no pruning) - // 2) check RocksDB (needs static-file tx data) - // 3) check static-file checkpoints vs MDBX (may prune) - // - // Compute one unwind target and run a single unwind. - - let provider_ro = factory.database_provider_ro()?; - - // Step 1: heal file-level inconsistencies (no pruning) - factory.static_file_provider().check_file_consistency(&provider_ro)?; - - // Step 2: RocksDB consistency check (needs static files tx data) - let rocksdb_unwind = factory.rocksdb_provider().check_consistency(&provider_ro)?; - - // Step 3: Static file checkpoint consistency (may prune) - let static_file_unwind = factory - .static_file_provider() - .check_consistency(&provider_ro)? - .map(|target| match target { - PipelineTarget::Unwind(block) => block, - PipelineTarget::Sync(_) => unreachable!("check_consistency returns Unwind"), - }); + // Check consistency between the database and static files, returning + // the unwind targets for each storage layer if inconsistencies are + // found. + let (rocksdb_unwind, static_file_unwind) = factory.check_consistency()?; // Take the minimum block number to ensure all storage layers are consistent. let unwind_target = [rocksdb_unwind, static_file_unwind].into_iter().flatten().min(); diff --git a/crates/storage/errors/src/provider.rs b/crates/storage/errors/src/provider.rs index 2ef823c5a1..51a7567644 100644 --- a/crates/storage/errors/src/provider.rs +++ b/crates/storage/errors/src/provider.rs @@ -177,6 +177,16 @@ pub enum ProviderError { /// The available range of blocks with changesets available: core::ops::RangeInclusive, }, + /// Inconsistency detected between static files/rocksdb and the DB during + /// `ProviderFactory::check_consistency`. The database must be unwound to + /// the specified block number to restore consistency. + #[error("consistency check failed for {data_source}. Db must be unwound to {unwind_to}")] + MustUnwind { + /// The inconsistent data source(s). + data_source: &'static str, + /// The block number to which the database must be unwound. + unwind_to: BlockNumber, + }, /// Any other error type wrapped into a cloneable [`AnyError`]. #[error(transparent)] Other(#[from] AnyError), diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index cf8ee2a286..e2f051f9b5 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -25,7 +25,7 @@ use reth_node_types::{ }; use reth_primitives_traits::{RecoveredBlock, SealedHeader}; use reth_prune_types::{PruneCheckpoint, PruneModes, PruneSegment}; -use reth_stages_types::{StageCheckpoint, StageId}; +use reth_stages_types::{PipelineTarget, StageCheckpoint, StageId}; use reth_static_file_types::StaticFileSegment; use reth_storage_api::{ BlockBodyIndicesProvider, NodePrimitivesProvider, StorageSettings, StorageSettingsCache, @@ -40,8 +40,7 @@ use std::{ path::Path, sync::Arc, }; - -use tracing::trace; +use tracing::{instrument, trace}; mod provider; pub use provider::{ @@ -90,6 +89,12 @@ impl ProviderFactory ProviderFactory { /// Create new database provider factory. + /// + /// The storage backends used by the produced factory MAY be inconsistent. + /// It is recommended to call [`Self::check_consistency`] after + /// creation to ensure consistency between the database and static files. + /// If the function returns unwind targets, the caller MUST unwind the + /// inner database to the minimum of the two targets to ensure consistency. pub fn new( db: N::DB, chain_spec: Arc, @@ -125,6 +130,22 @@ impl ProviderFactory { changeset_cache: ChangesetCache::new(), }) } + + /// Create new database provider factory and perform consistency checks. + /// + /// This will call [`Self::check_consistency`] internally and return + /// [`ProviderError::MustUnwind`] if inconsistencies are found. It may also + /// return any [`ProviderError`] that [`Self::new`] may return, or that are + /// encountered during consistency checks. + pub fn new_checked( + db: N::DB, + chain_spec: Arc, + static_file_provider: StaticFileProvider, + rocksdb_provider: RocksDBProvider, + ) -> ProviderResult { + Self::new(db, chain_spec, static_file_provider, rocksdb_provider) + .and_then(Self::assert_consistent) + } } impl ProviderFactory { @@ -285,6 +306,54 @@ impl ProviderFactory { trace!(target: "providers::db", ?block_number, %block_hash, "Returning historical state provider for block hash"); Ok(state_provider) } + + /// Asserts that the static files and database are consistent. If not, + /// returns [`ProviderError::MustUnwind`] with the appropriate unwind + /// target. May also return any [`ProviderError`] that + /// [`Self::check_consistency`] may return. + pub fn assert_consistent(self) -> ProviderResult { + let (rocksdb_unwind, static_file_unwind) = self.check_consistency()?; + + let source = match (rocksdb_unwind, static_file_unwind) { + (None, None) => return Ok(self), + (Some(_), Some(_)) => "RocksDB and Static Files", + (Some(_), None) => "RocksDB", + (None, Some(_)) => "Static Files", + }; + + Err(ProviderError::MustUnwind { + data_source: source, + unwind_to: rocksdb_unwind + .into_iter() + .chain(static_file_unwind) + .min() + .expect("at least one unwind target must be Some"), + }) + } + + /// Checks the consistency between the static files and the database. This + /// may result in static files being pruned or otherwise healed to ensure + /// consistency. I.e. this MAY result in writes to the static files. + #[instrument(err, skip(self))] + pub fn check_consistency(&self) -> ProviderResult<(Option, Option)> { + let provider_ro = self.database_provider_ro()?; + + // Step 1: heal file-level inconsistencies (no pruning) + self.static_file_provider().check_file_consistency(&provider_ro)?; + + // Step 2: RocksDB consistency check (needs static files tx data) + let rocksdb_unwind = self.rocksdb_provider().check_consistency(&provider_ro)?; + + // Step 3: Static file checkpoint consistency (may prune) + let static_file_unwind = self.static_file_provider().check_consistency(&provider_ro)?.map( + |target| match target { + PipelineTarget::Unwind(block) => block, + PipelineTarget::Sync(_) => unreachable!("check_consistency returns Unwind"), + }, + ); + + Ok((rocksdb_unwind, static_file_unwind)) + } } impl NodePrimitivesProvider for ProviderFactory { diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index f5a35234fe..2092bcd135 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -56,7 +56,7 @@ use std::{ sync::{atomic::AtomicU64, mpsc, Arc}, thread, }; -use tracing::{debug, info, instrument, trace, warn}; +use tracing::{debug, info, info_span, instrument, trace, warn}; /// Alias type for a map that can be queried for block or transaction ranges. It uses `u64` to /// represent either a block or a transaction number end of a static file range. @@ -1314,6 +1314,7 @@ impl StaticFileProvider { /// /// WARNING: No static file writer should be held before calling this function, otherwise it /// will deadlock. + #[instrument(skip(self), fields(read_only = self.is_read_only()))] pub fn check_consistency( &self, provider: &Provider, @@ -1350,19 +1351,28 @@ impl StaticFileProvider { info!(target: "reth::cli", "Verifying storage consistency."); let mut unwind_target: Option = None; - let mut update_unwind_target = |new_target: BlockNumber| { - if let Some(target) = unwind_target.as_mut() { - *target = (*target).min(new_target); - } else { - unwind_target = Some(new_target); - } + + let mut update_unwind_target = |new_target| { + unwind_target = + unwind_target.map(|current| current.min(new_target)).or(Some(new_target)); }; for segment in self.segments_to_check(provider) { - debug!(target: "reth::providers::static_file", ?segment, "Checking consistency for segment"); + let span = info_span!( + "Checking consistency for segment", + ?segment, + initial_highest_block = tracing::field::Empty, + highest_block = tracing::field::Empty, + highest_tx = tracing::field::Empty, + ); + let _guard = span.enter(); + + debug!(target: "reth::providers::static_file", "Checking consistency for segment"); // Heal file-level inconsistencies and get before/after highest block let (initial_highest_block, mut highest_block) = self.maybe_heal_segment(segment)?; + span.record("initial_highest_block", initial_highest_block); + span.record("highest_block", highest_block); // Only applies to block-based static files. (Headers) // @@ -1371,9 +1381,7 @@ impl StaticFileProvider { if initial_highest_block != highest_block { info!( target: "reth::providers::static_file", - ?initial_highest_block, unwind_target = highest_block, - ?segment, "Setting unwind target." ); update_unwind_target(highest_block.unwrap_or_default()); @@ -1385,101 +1393,70 @@ impl StaticFileProvider { // from a pruning interruption might have decreased the number of transactions without // being able to update the last block of the static file segment. let highest_tx = self.get_highest_static_file_tx(segment); - debug!(target: "reth::providers::static_file", ?segment, ?highest_tx, ?highest_block, "Highest transaction for segment"); + span.record("highest_tx", highest_tx); + debug!(target: "reth::providers::static_file", "Checking tx index segment"); + if let Some(highest_tx) = highest_tx { let mut last_block = highest_block.unwrap_or_default(); - debug!(target: "reth::providers::static_file", ?segment, last_block, highest_tx, "Verifying last transaction matches last block indices"); + debug!(target: "reth::providers::static_file", last_block, highest_tx, "Verifying last transaction matches last block indices"); loop { - if let Some(indices) = provider.block_body_indices(last_block)? { - debug!(target: "reth::providers::static_file", ?segment, last_block, last_tx_num = indices.last_tx_num(), highest_tx, "Found block body indices"); - if indices.last_tx_num() <= highest_tx { - break; - } - } else { - debug!(target: "reth::providers::static_file", ?segment, last_block, "Block body indices not found, static files ahead of database"); + let Some(indices) = provider.block_body_indices(last_block)? else { + debug!(target: "reth::providers::static_file", last_block, "Block body indices not found, static files ahead of database"); // If the block body indices can not be found, then it means that static // files is ahead of database, and the `ensure_invariants` check will fix // it by comparing with stage checkpoints. - break; + break + }; + + debug!(target: "reth::providers::static_file", last_block, last_tx_num = indices.last_tx_num(), "Found block body indices"); + + if indices.last_tx_num() <= highest_tx { + break } + if last_block == 0 { - debug!(target: "reth::providers::static_file", ?segment, "Reached block 0 in verification loop"); - break; + debug!(target: "reth::providers::static_file", "Reached block 0 in verification loop"); + break } + last_block -= 1; info!( target: "reth::providers::static_file", highest_block = self.get_highest_static_file_block(segment), unwind_target = last_block, - ?segment, "Setting unwind target." ); + span.record("highest_block", last_block); highest_block = Some(last_block); update_unwind_target(last_block); } } - debug!(target: "reth::providers::static_file", ?segment, "Ensuring invariants for segment"); - if let Some(unwind) = match segment { - StaticFileSegment::Headers => self - .ensure_invariants::<_, tables::Headers>( - provider, - segment, - highest_block, - highest_block, - )?, - StaticFileSegment::Transactions => self - .ensure_invariants::<_, tables::Transactions>( - provider, - segment, - highest_tx, - highest_block, - )?, - StaticFileSegment::Receipts => self - .ensure_invariants::<_, tables::Receipts>( - provider, - segment, - highest_tx, - highest_block, - )?, - StaticFileSegment::TransactionSenders => self - .ensure_invariants::<_, tables::TransactionSenders>( - provider, - segment, - highest_tx, - highest_block, - )?, - StaticFileSegment::AccountChangeSets => self - .ensure_invariants::<_, tables::AccountChangeSets>( - provider, - segment, - highest_tx, - highest_block, - )?, - StaticFileSegment::StorageChangeSets => self - .ensure_changeset_invariants_by_block::<_, tables::StorageChangeSets, _>( - provider, - segment, - highest_block, - |key| key.block_number(), - )?, - } { - debug!(target: "reth::providers::static_file", ?segment, unwind_target=unwind, "Invariants check returned unwind target"); - update_unwind_target(unwind); - } else { - debug!(target: "reth::providers::static_file", ?segment, "Invariants check completed, no unwind needed"); + debug!(target: "reth::providers::static_file", "Ensuring invariants for segment"); + + match self.ensure_invariants_for(provider, segment, highest_tx, highest_block)? { + Some(unwind) => { + debug!(target: "reth::providers::static_file", unwind_target=unwind, "Invariants check returned unwind target"); + update_unwind_target(unwind); + } + None => { + debug!(target: "reth::providers::static_file", "Invariants check completed, no unwind needed") + } } } Ok(unwind_target.map(PipelineTarget::Unwind)) } - /// Heals file-level (`NippyJar`) inconsistencies for eligible static file segments. + /// Heals file-level (`NippyJar`) inconsistencies for eligible static file + /// segments. /// - /// Call before [`Self::check_consistency`] so files are internally consistent. - /// Uses the same segment-skip logic as [`Self::check_consistency`], but does not compare with - /// database checkpoints or prune against them. + /// Call before [`Self::check_consistency`] so files are internally + /// consistent. + /// + /// Uses the same segment-skip logic as [`Self::check_consistency`], but + /// does not compare with database checkpoints or prune against them. pub fn check_file_consistency(&self, provider: &Provider) -> ProviderResult<()> where Provider: DBProvider + ChainSpecProvider + StorageSettingsCache + PruneCheckpointReader, @@ -1487,6 +1464,7 @@ impl StaticFileProvider { info!(target: "reth::cli", "Healing static file inconsistencies."); for segment in self.segments_to_check(provider) { + let _guard = info_span!("Healing static file segment", ?segment).entered(); let _ = self.maybe_heal_segment(segment)?; } @@ -1505,6 +1483,7 @@ impl StaticFileProvider { .filter(move |segment| self.should_check_segment(provider, *segment)) } + /// True if the given segment should be checked/healed for this provider. fn should_check_segment( &self, provider: &Provider, @@ -1579,23 +1558,25 @@ impl StaticFileProvider { .is_some_and(|checkpoint| checkpoint.prune_mode.is_full()) } - /// Checks consistency of the latest static file segment and throws an error if at fault. + /// Checks consistency of the latest static file segment and throws an + /// error if at fault. + /// /// Read-only. - pub fn check_segment_consistency(&self, segment: StaticFileSegment) -> ProviderResult<()> { - debug!(target: "reth::providers::static_file", ?segment, "Checking segment consistency"); + fn check_segment_consistency(&self, segment: StaticFileSegment) -> ProviderResult<()> { + debug!(target: "reth::providers::static_file", "Checking segment consistency"); if let Some(latest_block) = self.get_highest_static_file_block(segment) { let file_path = self .directory() .join(segment.filename(&self.find_fixed_range(segment, latest_block))); - debug!(target: "reth::providers::static_file", ?segment, ?file_path, latest_block, "Loading NippyJar for consistency check"); + debug!(target: "reth::providers::static_file", ?file_path, latest_block, "Loading NippyJar for consistency check"); let jar = NippyJar::::load(&file_path).map_err(ProviderError::other)?; - debug!(target: "reth::providers::static_file", ?segment, "NippyJar loaded, checking consistency"); + debug!(target: "reth::providers::static_file", "NippyJar loaded, checking consistency"); NippyJarChecker::new(jar).check_consistency().map_err(ProviderError::other)?; - debug!(target: "reth::providers::static_file", ?segment, "NippyJar consistency check passed"); + debug!(target: "reth::providers::static_file", "NippyJar consistency check passed"); } else { - debug!(target: "reth::providers::static_file", ?segment, "No static file block found, skipping consistency check"); + debug!(target: "reth::providers::static_file", "No static file block found, skipping consistency check"); } Ok(()) } @@ -1620,27 +1601,85 @@ impl StaticFileProvider { segment: StaticFileSegment, ) -> ProviderResult<(Option, Option)> { let initial_highest_block = self.get_highest_static_file_block(segment); - debug!(target: "reth::providers::static_file", ?segment, ?initial_highest_block, "Initial highest block for segment"); + debug!(target: "reth::providers::static_file", ?initial_highest_block, "Initial highest block for segment"); if self.access.is_read_only() { // Read-only mode: cannot modify files, so just validate consistency and error if // broken. - debug!(target: "reth::providers::static_file", ?segment, "Checking segment consistency (read-only)"); + debug!(target: "reth::providers::static_file", "Checking segment consistency (read-only)"); self.check_segment_consistency(segment)?; } else { // Writable mode: fetching the writer will automatically heal any file-level // inconsistency by truncating data to match the last committed config. - debug!(target: "reth::providers::static_file", ?segment, "Fetching latest writer which might heal any potential inconsistency"); + debug!(target: "reth::providers::static_file", "Fetching latest writer which might heal any potential inconsistency"); self.latest_writer(segment)?; } - // The updated `highest_block` may have decreased if we healed from a pruning - // interruption. + // The updated `highest_block` may have decreased if we healed from a + // pruning interruption. let highest_block = self.get_highest_static_file_block(segment); Ok((initial_highest_block, highest_block)) } + /// Ensure invariants for each corresponding table and static file segment. + fn ensure_invariants_for( + &self, + provider: &Provider, + segment: StaticFileSegment, + highest_tx: Option, + highest_block: Option, + ) -> ProviderResult> + where + Provider: DBProvider + BlockReader + StageCheckpointReader, + N: NodePrimitives, + { + match segment { + StaticFileSegment::Headers => self + .ensure_invariants::<_, tables::Headers>( + provider, + segment, + highest_block, + highest_block, + ), + StaticFileSegment::Transactions => self + .ensure_invariants::<_, tables::Transactions>( + provider, + segment, + highest_tx, + highest_block, + ), + StaticFileSegment::Receipts => self + .ensure_invariants::<_, tables::Receipts>( + provider, + segment, + highest_tx, + highest_block, + ), + StaticFileSegment::TransactionSenders => self + .ensure_invariants::<_, tables::TransactionSenders>( + provider, + segment, + highest_tx, + highest_block, + ), + StaticFileSegment::AccountChangeSets => self + .ensure_invariants::<_, tables::AccountChangeSets>( + provider, + segment, + highest_tx, + highest_block, + ), + StaticFileSegment::StorageChangeSets => self + .ensure_changeset_invariants_by_block::<_, tables::StorageChangeSets, _>( + provider, + segment, + highest_block, + |key| key.block_number(), + ), + } + } + /// Check invariants for each corresponding table and static file segment: /// /// * the corresponding database table should overlap or have continuity in their keys @@ -1655,6 +1694,7 @@ impl StaticFileProvider { /// /// * If the database tables overlap with static files and have contiguous keys, or the /// checkpoint block matches the highest static files block, then [`None`] will be returned. + #[instrument(skip(self, provider, segment), fields(table = T::NAME))] fn ensure_invariants>( &self, provider: &Provider, @@ -1665,11 +1705,11 @@ impl StaticFileProvider { where Provider: DBProvider + BlockReader + StageCheckpointReader, { - debug!(target: "reth::providers::static_file", ?segment, ?highest_static_file_entry, ?highest_static_file_block, "Ensuring invariants"); + debug!(target: "reth::providers::static_file", "Ensuring invariants"); let mut db_cursor = provider.tx_ref().cursor_read::()?; if let Some((db_first_entry, _)) = db_cursor.first()? { - debug!(target: "reth::providers::static_file", ?segment, db_first_entry, "Found first database entry"); + debug!(target: "reth::providers::static_file", db_first_entry, "Found first database entry"); if let (Some(highest_entry), Some(highest_block)) = (highest_static_file_entry, highest_static_file_block) { @@ -1682,7 +1722,6 @@ impl StaticFileProvider { ?db_first_entry, ?highest_entry, unwind_target = highest_block, - ?segment, "Setting unwind target." ); return Ok(Some(highest_block)); @@ -1693,11 +1732,11 @@ impl StaticFileProvider { highest_static_file_entry .is_none_or(|highest_entry| db_last_entry > highest_entry) { - debug!(target: "reth::providers::static_file", ?segment, db_last_entry, ?highest_static_file_entry, "Database has entries beyond static files, no unwind needed"); - return Ok(None); + debug!(target: "reth::providers::static_file", db_last_entry, "Database has entries beyond static files, no unwind needed"); + return Ok(None) } } else { - debug!(target: "reth::providers::static_file", ?segment, "No database entries found"); + debug!(target: "reth::providers::static_file", "No database entries found"); } let highest_static_file_entry = highest_static_file_entry.unwrap_or_default(); @@ -1708,7 +1747,7 @@ impl StaticFileProvider { let stage_id = segment.to_stage_id(); let checkpoint_block_number = provider.get_stage_checkpoint(stage_id)?.unwrap_or_default().block_number; - debug!(target: "reth::providers::static_file", ?segment, ?stage_id, checkpoint_block_number, highest_static_file_block, "Retrieved stage checkpoint"); + debug!(target: "reth::providers::static_file", ?stage_id, checkpoint_block_number, "Retrieved stage checkpoint"); // If the checkpoint is ahead, then we lost static file data. May be data corruption. if checkpoint_block_number > highest_static_file_block { @@ -1716,71 +1755,77 @@ impl StaticFileProvider { target: "reth::providers::static_file", checkpoint_block_number, unwind_target = highest_static_file_block, - ?segment, "Setting unwind target." ); return Ok(Some(highest_static_file_block)); } - // If the checkpoint is behind, then we failed to do a database commit **but committed** to - // static files on executing a stage, or the reverse on unwinding a stage. - // All we need to do is to prune the extra static file rows. - if checkpoint_block_number < highest_static_file_block { - info!( - target: "reth::providers", - ?segment, - from = highest_static_file_block, - to = checkpoint_block_number, - "Unwinding static file segment." - ); - let mut writer = self.latest_writer(segment)?; - match segment { - StaticFileSegment::Headers => { - let prune_count = highest_static_file_block - checkpoint_block_number; - debug!(target: "reth::providers::static_file", ?segment, prune_count, "Pruning headers"); - // TODO(joshie): is_block_meta - writer.prune_headers(prune_count)?; - } - StaticFileSegment::Transactions | - StaticFileSegment::Receipts | - StaticFileSegment::TransactionSenders => { - if let Some(block) = provider.block_body_indices(checkpoint_block_number)? { - let number = highest_static_file_entry - block.last_tx_num(); - debug!(target: "reth::providers::static_file", ?segment, prune_count = number, checkpoint_block_number, "Pruning transaction based segment"); - - match segment { - StaticFileSegment::Transactions => { - writer.prune_transactions(number, checkpoint_block_number)? - } - StaticFileSegment::Receipts => { - writer.prune_receipts(number, checkpoint_block_number)? - } - StaticFileSegment::TransactionSenders => { - writer.prune_transaction_senders(number, checkpoint_block_number)? - } - StaticFileSegment::Headers | - StaticFileSegment::AccountChangeSets | - StaticFileSegment::StorageChangeSets => { - unreachable!() - } - } - } else { - debug!(target: "reth::providers::static_file", ?segment, checkpoint_block_number, "No block body indices found for checkpoint block"); - } - } - StaticFileSegment::AccountChangeSets => { - writer.prune_account_changesets(checkpoint_block_number)?; - } - StaticFileSegment::StorageChangeSets => { - writer.prune_storage_changesets(checkpoint_block_number)?; - } - } - debug!(target: "reth::providers::static_file", ?segment, "Committing writer after pruning"); - writer.commit()?; - debug!(target: "reth::providers::static_file", ?segment, "Writer committed successfully"); + // If the checkpoint is ahead, or matches, then nothing to do. + if checkpoint_block_number >= highest_static_file_block { + debug!(target: "reth::providers::static_file", "Invariants ensured, returning None"); + return Ok(None); } - debug!(target: "reth::providers::static_file", ?segment, "Invariants ensured, returning None"); + // If the checkpoint is behind, then we failed to do a database commit + // **but committed** to static files on executing a stage, or the + // reverse on unwinding a stage. + // + // All we need to do is to prune the extra static file rows. + info!( + target: "reth::providers", + from = highest_static_file_block, + to = checkpoint_block_number, + "Unwinding static file segment." + ); + let mut writer = self.latest_writer(segment)?; + + match segment { + StaticFileSegment::Headers => { + let prune_count = highest_static_file_block - checkpoint_block_number; + debug!(target: "reth::providers::static_file", prune_count, "Pruning headers"); + // TODO(joshie): is_block_meta + writer.prune_headers(prune_count)?; + } + StaticFileSegment::Transactions | + StaticFileSegment::Receipts | + StaticFileSegment::TransactionSenders => { + if let Some(block) = provider.block_body_indices(checkpoint_block_number)? { + let number = highest_static_file_entry - block.last_tx_num(); + debug!(target: "reth::providers::static_file", prune_count = number, checkpoint_block_number, "Pruning transaction based segment"); + + match segment { + StaticFileSegment::Transactions => { + writer.prune_transactions(number, checkpoint_block_number)? + } + StaticFileSegment::Receipts => { + writer.prune_receipts(number, checkpoint_block_number)? + } + StaticFileSegment::TransactionSenders => { + writer.prune_transaction_senders(number, checkpoint_block_number)? + } + StaticFileSegment::Headers | + StaticFileSegment::AccountChangeSets | + StaticFileSegment::StorageChangeSets => { + unreachable!() + } + } + } else { + debug!(target: "reth::providers::static_file", checkpoint_block_number, "No block body indices found for checkpoint block"); + } + } + StaticFileSegment::AccountChangeSets => { + writer.prune_account_changesets(checkpoint_block_number)?; + } + StaticFileSegment::StorageChangeSets => { + writer.prune_storage_changesets(checkpoint_block_number)?; + } + } + + debug!(target: "reth::providers::static_file", "Committing writer after pruning"); + writer.commit()?; + debug!(target: "reth::providers::static_file", "Writer committed successfully"); + + debug!(target: "reth::providers::static_file", "Invariants ensured, returning None"); Ok(None) }