From d72105b47c2f15e3a79b727de775849ed56beeb6 Mon Sep 17 00:00:00 2001 From: YK Date: Tue, 30 Dec 2025 14:17:32 +0800 Subject: [PATCH] fix(storage): rocksdb consistency check on startup (#20596) Co-authored-by: Federico Gimenez --- crates/node/builder/src/launch/common.rs | 61 +++++++++--- .../src/providers/rocksdb/invariants.rs | 49 ++++++++-- .../provider/src/providers/rocksdb_stub.rs | 60 ++++++++++++ .../src/providers/static_file/manager.rs | 94 +++++++++++++------ 4 files changed, 218 insertions(+), 46 deletions(-) diff --git a/crates/node/builder/src/launch/common.rs b/crates/node/builder/src/launch/common.rs index 2f19c48ba0..b5e069e000 100644 --- a/crates/node/builder/src/launch/common.rs +++ b/crates/node/builder/src/launch/common.rs @@ -66,8 +66,9 @@ use reth_node_metrics::{ }; use reth_provider::{ providers::{NodeTypesForProvider, ProviderNodeTypes, RocksDBProvider, StaticFileProvider}, - BlockHashReader, BlockNumReader, ProviderError, ProviderFactory, ProviderResult, - StageCheckpointReader, StaticFileProviderBuilder, StaticFileProviderFactory, + BlockHashReader, BlockNumReader, DatabaseProviderFactory, ProviderError, ProviderFactory, + ProviderResult, RocksDBProviderFactory, StageCheckpointReader, StaticFileProviderBuilder, + StaticFileProviderFactory, }; use reth_prune::{PruneModes, PrunerBuilder}; use reth_rpc_builder::config::RethRpcServerConfig; @@ -497,20 +498,54 @@ where )? .with_prune_modes(self.prune_modes()); - // Check for consistency between database and static files. If it fails, it unwinds to - // the first block that's consistent between database and static files. - if let Some(unwind_target) = - factory.static_file_provider().check_consistency(&factory.provider()?)? - { + // Keep MDBX, static files, and RocksDB aligned. If any check fails, unwind to the + // earliest consistent block. + // + // Order matters: + // 1) heal static files (no pruning) + // 2) check RocksDB (needs static-file tx data) + // 3) check static-file checkpoints vs MDBX (may prune) + // + // Compute one unwind target and run a single unwind. + + let provider_ro = factory.database_provider_ro()?; + + // Step 1: heal file-level inconsistencies (no pruning) + factory.static_file_provider().check_file_consistency(&provider_ro)?; + + // Step 2: RocksDB consistency check (needs static files tx data) + let rocksdb_unwind = factory.rocksdb_provider().check_consistency(&provider_ro)?; + + // Step 3: Static file checkpoint consistency (may prune) + let static_file_unwind = factory + .static_file_provider() + .check_consistency(&provider_ro)? + .map(|target| match target { + PipelineTarget::Unwind(block) => block, + PipelineTarget::Sync(_) => unreachable!("check_consistency returns Unwind"), + }); + + // Take the minimum block number to ensure all storage layers are consistent. + let unwind_target = [rocksdb_unwind, static_file_unwind].into_iter().flatten().min(); + + if let Some(unwind_block) = unwind_target { // Highly unlikely to happen, and given its destructive nature, it's better to panic - // instead. + // instead. Unwinding to 0 would leave MDBX with a huge free list size. + let inconsistency_source = match (rocksdb_unwind, static_file_unwind) { + (Some(_), Some(_)) => "RocksDB and static file", + (Some(_), None) => "RocksDB", + (None, Some(_)) => "static file", + (None, None) => unreachable!(), + }; assert_ne!( - unwind_target, - PipelineTarget::Unwind(0), - "A static file <> database inconsistency was found that would trigger an unwind to block 0" + unwind_block, 0, + "A {} inconsistency was found that would trigger an unwind to block 0", + inconsistency_source ); - info!(target: "reth::cli", unwind_target = %unwind_target, "Executing an unwind after a failed storage consistency check."); + let unwind_target = PipelineTarget::Unwind(unwind_block); + + info!(target: "reth::cli", %unwind_target, %inconsistency_source, "Executing unwind after consistency check."); let (_tip_tx, tip_rx) = watch::channel(B256::ZERO); @@ -544,7 +579,7 @@ where }), ); rx.await?.inspect_err(|err| { - error!(target: "reth::cli", unwind_target = %unwind_target, %err, "failed to run unwind") + error!(target: "reth::cli", %unwind_target, %inconsistency_source, %err, "failed to run unwind") })?; } diff --git a/crates/storage/provider/src/providers/rocksdb/invariants.rs b/crates/storage/provider/src/providers/rocksdb/invariants.rs index ee746f151e..7a5c5f9db3 100644 --- a/crates/storage/provider/src/providers/rocksdb/invariants.rs +++ b/crates/storage/provider/src/providers/rocksdb/invariants.rs @@ -282,11 +282,8 @@ impl RocksDBProvider { "StoragesHistory ahead of checkpoint, pruning excess data" ); self.prune_storages_history_above(checkpoint)?; - return Ok(None); - } - - // If RocksDB is behind the checkpoint, request an unwind to rebuild. - if max_highest_block < checkpoint { + } else if max_highest_block < checkpoint { + // RocksDB is behind checkpoint, return highest block to signal unwind needed tracing::warn!( target: "reth::providers::rocksdb", rocks_highest = max_highest_block, @@ -718,6 +715,46 @@ mod tests { ); } + #[test] + fn test_check_consistency_storages_history_behind_checkpoint_needs_unwind() { + let temp_dir = TempDir::new().unwrap(); + let rocksdb = RocksDBBuilder::new(temp_dir.path()) + .with_table::() + .build() + .unwrap(); + + // Insert data into RocksDB with max highest_block_number = 80 + let key_block_50 = StorageShardedKey::new(Address::ZERO, B256::ZERO, 50); + let key_block_80 = StorageShardedKey::new(Address::ZERO, B256::from([1u8; 32]), 80); + let key_block_max = StorageShardedKey::new(Address::ZERO, B256::from([2u8; 32]), u64::MAX); + + let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]); + rocksdb.put::(key_block_50, &block_list).unwrap(); + rocksdb.put::(key_block_80, &block_list).unwrap(); + rocksdb.put::(key_block_max, &block_list).unwrap(); + + // Create a test provider factory for MDBX + let factory = create_test_provider_factory(); + factory.set_storage_settings_cache( + StorageSettings::legacy().with_storages_history_in_rocksdb(true), + ); + + // Set checkpoint to block 100 + { + let provider = factory.database_provider_rw().unwrap(); + provider + .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100)) + .unwrap(); + provider.commit().unwrap(); + } + + let provider = factory.database_provider_ro().unwrap(); + + // RocksDB max highest_block (80) is behind checkpoint (100) + let result = rocksdb.check_consistency(&provider).unwrap(); + assert_eq!(result, Some(80), "Should unwind to the highest block present in RocksDB"); + } + #[test] fn test_check_consistency_mdbx_behind_checkpoint_needs_unwind() { let temp_dir = TempDir::new().unwrap(); @@ -942,7 +979,7 @@ mod tests { } #[test] - fn test_check_consistency_storages_history_behind_checkpoint_needs_unwind() { + fn test_check_consistency_storages_history_behind_checkpoint_single_entry() { use reth_db_api::models::storage_sharded_key::StorageShardedKey; let temp_dir = TempDir::new().unwrap(); diff --git a/crates/storage/provider/src/providers/rocksdb_stub.rs b/crates/storage/provider/src/providers/rocksdb_stub.rs index 32b79c1880..5fac73eca7 100644 --- a/crates/storage/provider/src/providers/rocksdb_stub.rs +++ b/crates/storage/provider/src/providers/rocksdb_stub.rs @@ -77,6 +77,39 @@ impl RocksDBProvider { pub const fn tx(&self) -> RocksTx { RocksTx } + + /// Creates a new batch for atomic writes (stub implementation). + pub const fn batch(&self) -> RocksDBBatch { + RocksDBBatch + } + + /// Gets the first key-value pair from a table (stub implementation). + pub const fn first(&self) -> ProviderResult> { + Ok(None) + } + + /// Gets the last key-value pair from a table (stub implementation). + pub const fn last(&self) -> ProviderResult> { + Ok(None) + } + + /// Creates an iterator for the specified table (stub implementation). + /// + /// Returns an empty iterator. This is consistent with `first()` and `last()` returning + /// `Ok(None)` - the stub behaves as if the database is empty rather than unavailable. + pub const fn iter(&self) -> ProviderResult> { + Ok(RocksDBIter { _marker: std::marker::PhantomData }) + } + + /// Check consistency of `RocksDB` tables (stub implementation). + /// + /// Returns `None` since there is no `RocksDB` data to check when the feature is disabled. + pub const fn check_consistency( + &self, + _provider: &Provider, + ) -> ProviderResult> { + Ok(None) + } } /// A stub batch writer for `RocksDB` on non-Unix platforms. @@ -102,6 +135,25 @@ impl RocksDBBatch { pub fn delete(&self, _key: T::Key) -> ProviderResult<()> { Err(UnsupportedProvider) } + + /// Commits the batch (stub implementation). + pub const fn commit(self) -> ProviderResult<()> { + Err(UnsupportedProvider) + } +} + +/// A stub iterator for `RocksDB` (non-transactional). +#[derive(Debug)] +pub struct RocksDBIter<'a, T> { + _marker: std::marker::PhantomData<(&'a (), T)>, +} + +impl Iterator for RocksDBIter<'_, T> { + type Item = ProviderResult<(T::Key, T::Value)>; + + fn next(&mut self) -> Option { + None + } } /// A stub builder for `RocksDB` on non-Unix platforms. @@ -213,3 +265,11 @@ impl RocksTx { pub struct RocksTxIter<'a, T> { _marker: std::marker::PhantomData<(&'a (), T)>, } + +impl Iterator for RocksTxIter<'_, T> { + type Item = ProviderResult<(T::Key, T::Value)>; + + fn next(&mut self) -> Option { + None + } +} diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index 1c1762d70d..944f1e30bf 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -1055,34 +1055,8 @@ impl StaticFileProvider { } }; - for segment in StaticFileSegment::iter() { + for segment in self.segments_to_check(provider) { debug!(target: "reth::providers::static_file", ?segment, "Checking consistency for segment"); - match segment { - StaticFileSegment::Headers | StaticFileSegment::Transactions => {} - StaticFileSegment::Receipts => { - if EitherWriter::receipts_destination(provider).is_database() { - // Old pruned nodes (including full node) do not store receipts as static - // files. - debug!(target: "reth::providers::static_file", ?segment, "Skipping receipts consistency check: receipts stored in database"); - continue - } - - if NamedChain::Gnosis == provider.chain_spec().chain_id() || - NamedChain::Chiado == provider.chain_spec().chain_id() - { - // Gnosis and Chiado's historical import is broken and does not work with - // this check. They are importing receipts along - // with importing headers/bodies. - debug!(target: "reth::providers::static_file", ?segment, "Skipping receipts consistency check: broken historical import for gnosis/chiado"); - continue; - } - } - StaticFileSegment::TransactionSenders => { - if EitherWriterDestination::senders(provider).is_database() { - continue - } - } - } // Heal file-level inconsistencies and get before/after highest block let (initial_highest_block, mut highest_block) = self.maybe_heal_segment(segment)?; @@ -1184,6 +1158,72 @@ impl StaticFileProvider { Ok(unwind_target.map(PipelineTarget::Unwind)) } + /// Heals file-level (`NippyJar`) inconsistencies for eligible static file segments. + /// + /// Call before [`Self::check_consistency`] so files are internally consistent. + /// Uses the same segment-skip logic as [`Self::check_consistency`], but does not compare with + /// database checkpoints or prune against them. + pub fn check_file_consistency(&self, provider: &Provider) -> ProviderResult<()> + where + Provider: DBProvider + ChainSpecProvider + StorageSettingsCache, + { + info!(target: "reth::cli", "Healing static file inconsistencies."); + + for segment in self.segments_to_check(provider) { + let _ = self.maybe_heal_segment(segment)?; + } + + Ok(()) + } + + /// Returns the static file segments that should be checked/healed for this provider. + fn segments_to_check<'a, Provider>( + &'a self, + provider: &'a Provider, + ) -> impl Iterator + 'a + where + Provider: DBProvider + ChainSpecProvider + StorageSettingsCache, + { + StaticFileSegment::iter() + .filter(move |segment| self.should_check_segment(provider, *segment)) + } + + fn should_check_segment( + &self, + provider: &Provider, + segment: StaticFileSegment, + ) -> bool + where + Provider: DBProvider + ChainSpecProvider + StorageSettingsCache, + { + match segment { + StaticFileSegment::Headers | StaticFileSegment::Transactions => true, + StaticFileSegment::Receipts => { + if EitherWriter::receipts_destination(provider).is_database() { + // Old pruned nodes (including full node) do not store receipts as static + // files. + debug!(target: "reth::providers::static_file", ?segment, "Skipping receipts segment: receipts stored in database"); + return false + } + + if NamedChain::Gnosis == provider.chain_spec().chain_id() || + NamedChain::Chiado == provider.chain_spec().chain_id() + { + // Gnosis and Chiado's historical import is broken and does not work with + // this check. They are importing receipts along + // with importing headers/bodies. + debug!(target: "reth::providers::static_file", ?segment, "Skipping receipts segment: broken historical import for gnosis/chiado"); + return false; + } + + true + } + StaticFileSegment::TransactionSenders => { + !EitherWriterDestination::senders(provider).is_database() + } + } + } + /// Checks consistency of the latest static file segment and throws an error if at fault. /// Read-only. pub fn check_segment_consistency(&self, segment: StaticFileSegment) -> ProviderResult<()> {