From 9a4c6d8a118c15115763f5f92cda1723218a8b91 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Fri, 23 Jan 2026 20:11:47 +0000 Subject: [PATCH] feat(rocksdb): static file based healing for TransactionHashNumbers (#21343) --- .../src/providers/rocksdb/invariants.rs | 221 +++++++++--------- 1 file changed, 113 insertions(+), 108 deletions(-) diff --git a/crates/storage/provider/src/providers/rocksdb/invariants.rs b/crates/storage/provider/src/providers/rocksdb/invariants.rs index 63901ac74e..286e3a41ec 100644 --- a/crates/storage/provider/src/providers/rocksdb/invariants.rs +++ b/crates/storage/provider/src/providers/rocksdb/invariants.rs @@ -9,13 +9,12 @@ use crate::StaticFileProviderFactory; use alloy_eips::eip2718::Encodable2718; use alloy_primitives::BlockNumber; use rayon::prelude::*; -use reth_db::cursor::DbCursorRO; -use reth_db_api::{tables, transaction::DbTx}; +use reth_db_api::tables; use reth_stages_types::StageId; use reth_static_file_types::StaticFileSegment; use reth_storage_api::{ - ChangeSetReader, DBProvider, StageCheckpointReader, StorageChangeSetReader, - StorageSettingsCache, TransactionsProvider, + BlockBodyIndicesProvider, ChangeSetReader, DBProvider, StageCheckpointReader, + StorageChangeSetReader, StorageSettingsCache, TransactionsProvider, }; use reth_storage_errors::provider::ProviderResult; use std::collections::HashSet; @@ -55,15 +54,16 @@ impl RocksDBProvider { + StageCheckpointReader + StorageSettingsCache + StaticFileProviderFactory + + BlockBodyIndicesProvider + StorageChangeSetReader + ChangeSetReader + TransactionsProvider, { let mut unwind_target: Option = None; - // Check TransactionHashNumbers if stored in RocksDB + // Heal TransactionHashNumbers if stored in RocksDB if provider.cached_storage_settings().transaction_hash_numbers_in_rocksdb && - let Some(target) = self.check_transaction_hash_numbers(provider)? + let Some(target) = self.heal_transaction_hash_numbers(provider)? { unwind_target = Some(unwind_target.map_or(target, |t| t.min(target))); } @@ -85,19 +85,13 @@ impl RocksDBProvider { Ok(unwind_target) } - /// Checks invariants for the `TransactionHashNumbers` table. + /// Heals the `TransactionHashNumbers` table. /// - /// Returns a block number to unwind to if MDBX is behind the checkpoint. - /// If static files are ahead of MDBX, excess `RocksDB` entries are pruned (healed). - /// - /// # Approach - /// - /// Instead of iterating `RocksDB` entries (which is expensive and doesn't give us the - /// tx range we need), we use static files and MDBX to determine what needs pruning: - /// - Static files are committed before `RocksDB`, so they're at least at the same height - /// - MDBX `TransactionBlocks` tells us what's been fully committed - /// - If static files have more transactions than MDBX, prune the excess range - fn check_transaction_hash_numbers( + /// - Fast path: if checkpoint == 0 AND `RocksDB` has data, clear everything + /// - If `sf_tip` < checkpoint, return unwind target (static files behind) + /// - If `sf_tip` == checkpoint, nothing to do + /// - If `sf_tip` > checkpoint, heal via transaction ranges in batches + fn heal_transaction_hash_numbers( &self, provider: &Provider, ) -> ProviderResult> @@ -105,73 +99,99 @@ impl RocksDBProvider { Provider: DBProvider + StageCheckpointReader + StaticFileProviderFactory + + BlockBodyIndicesProvider + TransactionsProvider, { - // Get the TransactionLookup stage checkpoint let checkpoint = provider .get_stage_checkpoint(StageId::TransactionLookup)? .map(|cp| cp.block_number) .unwrap_or(0); - // Get last tx_num from MDBX - this tells us what MDBX has fully committed - let mut cursor = provider.tx_ref().cursor_read::()?; - let mdbx_last = cursor.last()?; - - // Get highest tx_num from static files - this tells us what tx data is available - let highest_static_tx = provider + let sf_tip = provider .static_file_provider() - .get_highest_static_file_tx(StaticFileSegment::Transactions); + .get_highest_static_file_block(StaticFileSegment::Transactions) + .unwrap_or(0); - match (mdbx_last, highest_static_tx) { - (Some((mdbx_tx, mdbx_block)), Some(highest_tx)) if highest_tx > mdbx_tx => { - // Static files are ahead of MDBX - prune RocksDB entries for the excess range. - // This is the common case during recovery from a crash during unwinding. - tracing::info!( - target: "reth::providers::rocksdb", - mdbx_last_tx = mdbx_tx, - mdbx_block, - highest_static_tx = highest_tx, - "Static files ahead of MDBX, pruning TransactionHashNumbers excess data" - ); - self.prune_transaction_hash_numbers_in_range(provider, (mdbx_tx + 1)..=highest_tx)?; + // Fast path: if checkpoint is 0 and RocksDB has data, clear everything. + if checkpoint == 0 && self.first::()?.is_some() { + tracing::info!( + target: "reth::providers::rocksdb", + "TransactionHashNumbers has data but checkpoint is 0, clearing all" + ); + self.clear::()?; + return Ok(None); + } - // After pruning, check if MDBX is behind checkpoint - if checkpoint > mdbx_block { - tracing::warn!( - target: "reth::providers::rocksdb", - mdbx_block, - checkpoint, - "MDBX behind checkpoint after pruning, unwind needed" - ); - return Ok(Some(mdbx_block)); - } - } - (Some((_mdbx_tx, mdbx_block)), _) => { - // MDBX and static files are in sync (or static files don't have more data). - // Check if MDBX is behind checkpoint. - if checkpoint > mdbx_block { - tracing::warn!( - target: "reth::providers::rocksdb", - mdbx_block, - checkpoint, - "MDBX behind checkpoint, unwind needed" - ); - return Ok(Some(mdbx_block)); - } - } - (None, Some(highest_tx)) => { - // MDBX has no transactions but static files have data. - // This means RocksDB might have stale entries - prune them all. - tracing::info!( - target: "reth::providers::rocksdb", - highest_static_tx = highest_tx, - "MDBX empty but static files have data, pruning all TransactionHashNumbers" - ); - self.prune_transaction_hash_numbers_in_range(provider, 0..=highest_tx)?; - } - (None, None) => { - // Both MDBX and static files are empty, nothing to check. - } + if sf_tip < checkpoint { + // This should never happen in normal operation - static files are always committed + // before RocksDB. If we get here, something is seriously wrong. The unwind is a + // best-effort attempt but is probably futile. + tracing::warn!( + target: "reth::providers::rocksdb", + sf_tip, + checkpoint, + "TransactionHashNumbers: static file tip behind checkpoint, unwind needed" + ); + return Ok(Some(sf_tip)); + } + + // sf_tip == checkpoint - nothing to do + if sf_tip == checkpoint { + return Ok(None); + } + + // Get end tx from static files (authoritative for sf_tip) + let sf_tip_end_tx = provider + .static_file_provider() + .get_highest_static_file_tx(StaticFileSegment::Transactions) + .unwrap_or(0); + + // Get the first tx after the checkpoint block from MDBX (authoritative up to checkpoint) + let checkpoint_next_tx = provider + .block_body_indices(checkpoint)? + .map(|indices| indices.next_tx_num()) + .unwrap_or(0); + + if sf_tip_end_tx < checkpoint_next_tx { + // This should never happen in normal operation - static files should have all + // transactions up to sf_tip. If we get here, something is seriously wrong. + // The unwind is a best-effort attempt but is probably futile. + tracing::warn!( + target: "reth::providers::rocksdb", + sf_tip_end_tx, + checkpoint_next_tx, + checkpoint, + sf_tip, + "TransactionHashNumbers: static file tx tip behind checkpoint, unwind needed" + ); + return Ok(Some(sf_tip)); + } + + tracing::info!( + target: "reth::providers::rocksdb", + checkpoint, + sf_tip, + checkpoint_next_tx, + sf_tip_end_tx, + "TransactionHashNumbers: healing via transaction ranges" + ); + + const BATCH_SIZE: u64 = 10_000; + let mut batch_start = checkpoint_next_tx; + + while batch_start <= sf_tip_end_tx { + let batch_end = batch_start.saturating_add(BATCH_SIZE - 1).min(sf_tip_end_tx); + + tracing::debug!( + target: "reth::providers::rocksdb", + batch_start, + batch_end, + "Pruning TransactionHashNumbers batch" + ); + + self.prune_transaction_hash_numbers_in_range(provider, batch_start..=batch_end)?; + + batch_start = batch_end.saturating_add(1); } Ok(None) @@ -425,7 +445,7 @@ mod tests { BlockWriter, DatabaseProviderFactory, StageCheckpointWriter, TransactionsProvider, }; use alloy_primitives::{Address, B256}; - use reth_db::cursor::DbCursorRW; + use reth_db::cursor::{DbCursorRO, DbCursorRW}; use reth_db_api::{ models::{storage_sharded_key::StorageShardedKey, StorageSettings}, tables::{self, BlockNumberList}, @@ -520,13 +540,15 @@ mod tests { let provider = factory.database_provider_ro().unwrap(); // RocksDB is empty but checkpoint says block 100 was processed. - // This is treated as a first-run/migration scenario - no unwind needed. + // Since static file tip defaults to 0 when None, and 0 < 100, an unwind is triggered. let result = rocksdb.check_consistency(&provider).unwrap(); - assert_eq!(result, None, "Empty data with checkpoint is treated as first run"); + assert_eq!(result, Some(0), "Static file tip (0) behind checkpoint (100) triggers unwind"); } + /// Tests that when checkpoint=0 and `RocksDB` has data, all entries are pruned. + /// This simulates a crash recovery scenario where the checkpoint was lost. #[test] - fn test_check_consistency_mdbx_empty_static_files_have_data_prunes_rocksdb() { + fn test_check_consistency_checkpoint_zero_with_rocksdb_data_prunes_all() { let temp_dir = TempDir::new().unwrap(); let rocksdb = RocksDBBuilder::new(temp_dir.path()) .with_table::() @@ -564,22 +586,12 @@ mod tests { provider.commit().unwrap(); } - // Simulate crash recovery: MDBX was reset but static files and RocksDB still have data. - // Clear TransactionBlocks to simulate empty MDBX state. + // Explicitly clear the TransactionLookup checkpoint to simulate crash recovery { let provider = factory.database_provider_rw().unwrap(); - let mut cursor = provider.tx_ref().cursor_write::().unwrap(); - let mut to_delete = Vec::new(); - let mut walker = cursor.walk(Some(0)).unwrap(); - while let Some((tx_num, _)) = walker.next().transpose().unwrap() { - to_delete.push(tx_num); - } - drop(walker); - for tx_num in to_delete { - cursor.seek_exact(tx_num).unwrap(); - cursor.delete_current().unwrap(); - } - // No checkpoint set (checkpoint = 0) + provider + .save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(0)) + .unwrap(); provider.commit().unwrap(); } @@ -588,12 +600,12 @@ mod tests { let provider = factory.database_provider_ro().unwrap(); - // MDBX TransactionBlocks is empty, but static files have transaction data. - // This means RocksDB has stale data that should be pruned (healed). + // checkpoint = 0 but RocksDB has data. + // This means RocksDB has stale data that should be cleared. let result = rocksdb.check_consistency(&provider).unwrap(); - assert_eq!(result, None, "Should heal by pruning, no unwind needed"); + assert_eq!(result, None, "Should heal by clearing, no unwind needed"); - // Verify data was pruned + // Verify data was cleared for hash in &tx_hashes { assert!( rocksdb.get::(*hash).unwrap().is_none(), @@ -669,7 +681,6 @@ mod tests { "RocksDB should be empty after pruning" ); } - #[test] fn test_check_consistency_mdbx_behind_checkpoint_needs_unwind() { let temp_dir = TempDir::new().unwrap(); @@ -707,9 +718,9 @@ mod tests { provider.commit().unwrap(); } - // Now simulate a scenario where checkpoint is ahead of MDBX. - // This happens when the checkpoint was saved but MDBX data was lost/corrupted. // Set checkpoint to block 10 (beyond our actual data at block 2) + // sf_tip is at block 2, checkpoint is at block 10 + // Since sf_tip < checkpoint, we need to unwind to sf_tip { let provider = factory.database_provider_rw().unwrap(); provider @@ -720,15 +731,9 @@ mod tests { let provider = factory.database_provider_ro().unwrap(); - // MDBX has data up to block 2, but checkpoint says block 10 was processed. - // The static files highest tx matches MDBX last tx (both at block 2). - // Checkpoint > mdbx_block means we need to unwind to rebuild. + // sf_tip (2) < checkpoint (10), so unwind to sf_tip is needed let result = rocksdb.check_consistency(&provider).unwrap(); - assert_eq!( - result, - Some(2), - "Should require unwind to block 2 (MDBX's last block) to rebuild from checkpoint" - ); + assert_eq!(result, Some(2), "sf_tip < checkpoint requires unwind to sf_tip"); } #[test]