From decb56fae1c9e056548cd4d9f81744f14cae87d8 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Fri, 23 Jan 2026 19:28:10 +0000 Subject: [PATCH] feat(rocksdb): changeset-based crash recovery healing for history indices (#21341) --- .../src/providers/rocksdb/invariants.rs | 1162 +++++++++++------ 1 file changed, 741 insertions(+), 421 deletions(-) diff --git a/crates/storage/provider/src/providers/rocksdb/invariants.rs b/crates/storage/provider/src/providers/rocksdb/invariants.rs index 75be8ca5ad..63901ac74e 100644 --- a/crates/storage/provider/src/providers/rocksdb/invariants.rs +++ b/crates/storage/provider/src/providers/rocksdb/invariants.rs @@ -14,9 +14,15 @@ use reth_db_api::{tables, transaction::DbTx}; use reth_stages_types::StageId; use reth_static_file_types::StaticFileSegment; use reth_storage_api::{ - DBProvider, StageCheckpointReader, StorageSettingsCache, TransactionsProvider, + ChangeSetReader, DBProvider, StageCheckpointReader, StorageChangeSetReader, + StorageSettingsCache, TransactionsProvider, }; use reth_storage_errors::provider::ProviderResult; +use std::collections::HashSet; + +/// Batch size for changeset iteration during history healing. +/// Balances memory usage against iteration overhead. +const HEAL_HISTORY_BATCH_SIZE: u64 = 10_000; impl RocksDBProvider { /// Checks consistency of `RocksDB` tables against MDBX stage checkpoints. @@ -32,10 +38,8 @@ impl RocksDBProvider { /// - If `RocksDB` is ahead, excess entries are pruned (healed). /// - If `RocksDB` is behind, an unwind is required. /// - /// For `StoragesHistory`: - /// - The maximum block number in shards should not exceed the `IndexStorageHistory` stage - /// checkpoint. - /// - Similar healing/unwind logic applies. + /// For `StoragesHistory` and `AccountsHistory`: + /// - Uses changesets to heal stale entries when static file tip > checkpoint. /// /// # Requirements /// @@ -51,6 +55,8 @@ impl RocksDBProvider { + StageCheckpointReader + StorageSettingsCache + StaticFileProviderFactory + + StorageChangeSetReader + + ChangeSetReader + TransactionsProvider, { let mut unwind_target: Option = None; @@ -62,16 +68,16 @@ impl RocksDBProvider { unwind_target = Some(unwind_target.map_or(target, |t| t.min(target))); } - // Check StoragesHistory if stored in RocksDB + // Heal StoragesHistory if stored in RocksDB if provider.cached_storage_settings().storages_history_in_rocksdb && - let Some(target) = self.check_storages_history(provider)? + let Some(target) = self.heal_storages_history(provider)? { unwind_target = Some(unwind_target.map_or(target, |t| t.min(target))); } - // Check AccountsHistory if stored in RocksDB + // Heal AccountsHistory if stored in RocksDB if provider.cached_storage_settings().account_history_in_rocksdb && - let Some(target) = self.check_accounts_history(provider)? + let Some(target) = self.heal_accounts_history(provider)? { unwind_target = Some(unwind_target.map_or(target, |t| t.min(target))); } @@ -221,251 +227,192 @@ impl RocksDBProvider { Ok(()) } - /// Checks invariants for the `StoragesHistory` table. + /// Heals the `StoragesHistory` table by removing stale entries. /// - /// Returns a block number to unwind to if `RocksDB` is behind the checkpoint. - /// If `RocksDB` is ahead of the checkpoint, excess entries are pruned (healed). - fn check_storages_history( + /// Returns an unwind target if static file tip is behind checkpoint (cannot heal). + /// Otherwise iterates changesets in batches to identify and unwind affected keys. + fn heal_storages_history( &self, provider: &Provider, ) -> ProviderResult> where - Provider: DBProvider + StageCheckpointReader, + Provider: + DBProvider + StageCheckpointReader + StaticFileProviderFactory + StorageChangeSetReader, { - // Get the IndexStorageHistory stage checkpoint let checkpoint = provider .get_stage_checkpoint(StageId::IndexStorageHistory)? .map(|cp| cp.block_number) .unwrap_or(0); - // Check if RocksDB has any data - let rocks_first = self.first::()?; - - match rocks_first { - Some(_) => { - // If checkpoint is 0 but we have data, clear everything - if checkpoint == 0 { - tracing::info!( - target: "reth::providers::rocksdb", - "StoragesHistory has data but checkpoint is 0, clearing all" - ); - self.prune_storages_history_above(0)?; - return Ok(None); - } - - // Find the max highest_block_number (excluding u64::MAX sentinel) across all - // entries. Also track if we found any non-sentinel entries. - let mut max_highest_block = 0u64; - let mut found_non_sentinel = false; - for result in self.iter::()? { - let (key, _) = result?; - let highest = key.sharded_key.highest_block_number; - if highest != u64::MAX { - found_non_sentinel = true; - if highest > max_highest_block { - max_highest_block = highest; - } - } - } - - // If all entries are sentinel entries (u64::MAX), treat as first-run scenario. - // This means no completed shards exist (only sentinel shards with - // highest_block_number=u64::MAX), so no actual history has been indexed. - if !found_non_sentinel { - return Ok(None); - } - - // If any entry has highest_block > checkpoint, prune excess - if max_highest_block > checkpoint { - tracing::info!( - target: "reth::providers::rocksdb", - rocks_highest = max_highest_block, - checkpoint, - "StoragesHistory ahead of checkpoint, pruning excess data" - ); - self.prune_storages_history_above(checkpoint)?; - } else if max_highest_block < checkpoint { - // RocksDB is behind checkpoint, return highest block to signal unwind needed - tracing::warn!( - target: "reth::providers::rocksdb", - rocks_highest = max_highest_block, - checkpoint, - "StoragesHistory behind checkpoint, unwind needed" - ); - return Ok(Some(max_highest_block)); - } - - Ok(None) - } - None => { - // Empty RocksDB table, nothing to check. - Ok(None) - } - } - } - - /// Prunes `StoragesHistory` entries where `highest_block_number` > `max_block`. - /// - /// For `StoragesHistory`, the key contains `highest_block_number`, so we can iterate - /// and delete entries where `key.sharded_key.highest_block_number > max_block`. - /// - /// TODO(): this iterates the whole table, - /// which is inefficient. Use changeset-based pruning instead. - fn prune_storages_history_above(&self, max_block: BlockNumber) -> ProviderResult<()> { - use reth_db_api::models::storage_sharded_key::StorageShardedKey; - - let mut to_delete: Vec = Vec::new(); - for result in self.iter::()? { - let (key, _) = result?; - let highest_block = key.sharded_key.highest_block_number; - if max_block == 0 || (highest_block != u64::MAX && highest_block > max_block) { - to_delete.push(key); - } - } - - let deleted = to_delete.len(); - if deleted > 0 { + // Fast path: if checkpoint is 0 and RocksDB has data, clear everything. + if checkpoint == 0 && self.first::()?.is_some() { tracing::info!( target: "reth::providers::rocksdb", - deleted_count = deleted, - max_block, - "Pruning StoragesHistory entries" + "StoragesHistory has data but checkpoint is 0, clearing all" ); - - let mut batch = self.batch(); - for key in to_delete { - batch.delete::(key)?; - } - batch.commit()?; + self.clear::()?; + return Ok(None); } - Ok(()) + let sf_tip = provider + .static_file_provider() + .get_highest_static_file_block(StaticFileSegment::StorageChangeSets) + .unwrap_or(0); + + if sf_tip < checkpoint { + // This should never happen in normal operation - static files are always + // committed before RocksDB. If we get here, something is seriously wrong. + // The unwind is a best-effort attempt but is probably futile. + tracing::warn!( + target: "reth::providers::rocksdb", + sf_tip, + checkpoint, + "StoragesHistory: static file tip behind checkpoint, unwind needed" + ); + return Ok(Some(sf_tip)); + } + + if sf_tip == checkpoint { + return Ok(None); + } + + let total_blocks = sf_tip - checkpoint; + tracing::info!( + target: "reth::providers::rocksdb", + checkpoint, + sf_tip, + total_blocks, + "StoragesHistory: healing via changesets" + ); + + let mut batch_start = checkpoint + 1; + let mut batch_num = 0u64; + let total_batches = total_blocks.div_ceil(HEAL_HISTORY_BATCH_SIZE); + + while batch_start <= sf_tip { + let batch_end = (batch_start + HEAL_HISTORY_BATCH_SIZE - 1).min(sf_tip); + batch_num += 1; + + let changesets = provider.storage_changesets_range(batch_start..=batch_end)?; + + let unique_keys: HashSet<_> = changesets + .into_iter() + .map(|(block_addr, entry)| (block_addr.address(), entry.key, checkpoint + 1)) + .collect(); + let indices: Vec<_> = unique_keys.into_iter().collect(); + + if !indices.is_empty() { + tracing::info!( + target: "reth::providers::rocksdb", + batch_num, + total_batches, + batch_start, + batch_end, + indices_count = indices.len(), + "StoragesHistory: unwinding batch" + ); + + let batch = self.unwind_storage_history_indices(&indices)?; + self.commit_batch(batch)?; + } + + batch_start = batch_end + 1; + } + + Ok(None) } - /// Checks invariants for the `AccountsHistory` table. + /// Heals the `AccountsHistory` table by removing stale entries. /// - /// Returns a block number to unwind to if `RocksDB` is behind the checkpoint. - /// If `RocksDB` is ahead of the checkpoint, excess entries are pruned (healed). - fn check_accounts_history( + /// Returns an unwind target if static file tip is behind checkpoint (cannot heal). + /// Otherwise iterates changesets in batches to identify and unwind affected keys. + fn heal_accounts_history( &self, provider: &Provider, ) -> ProviderResult> where - Provider: DBProvider + StageCheckpointReader, + Provider: DBProvider + StageCheckpointReader + StaticFileProviderFactory + ChangeSetReader, { - // Get the IndexAccountHistory stage checkpoint let checkpoint = provider .get_stage_checkpoint(StageId::IndexAccountHistory)? .map(|cp| cp.block_number) .unwrap_or(0); - // Check if RocksDB has any data - let rocks_first = self.first::()?; - - match rocks_first { - Some(_) => { - // If checkpoint is 0 but we have data, clear everything - if checkpoint == 0 { - tracing::info!( - target: "reth::providers::rocksdb", - "AccountsHistory has data but checkpoint is 0, clearing all" - ); - self.prune_accounts_history_above(0)?; - return Ok(None); - } - - // Find the max highest_block_number (excluding u64::MAX sentinel) across all - // entries. Also track if we found any non-sentinel entries. - let mut max_highest_block = 0u64; - let mut found_non_sentinel = false; - for result in self.iter::()? { - let (key, _) = result?; - let highest = key.highest_block_number; - if highest != u64::MAX { - found_non_sentinel = true; - if highest > max_highest_block { - max_highest_block = highest; - } - } - } - - // If all entries are sentinel entries (u64::MAX), treat as first-run scenario. - // This means no completed shards exist (only sentinel shards with - // highest_block_number=u64::MAX), so no actual history has been indexed. - if !found_non_sentinel { - return Ok(None); - } - - // If any entry has highest_block > checkpoint, prune excess - if max_highest_block > checkpoint { - tracing::info!( - target: "reth::providers::rocksdb", - rocks_highest = max_highest_block, - checkpoint, - "AccountsHistory ahead of checkpoint, pruning excess data" - ); - self.prune_accounts_history_above(checkpoint)?; - return Ok(None); - } - - // If RocksDB is behind the checkpoint, request an unwind to rebuild. - if max_highest_block < checkpoint { - tracing::warn!( - target: "reth::providers::rocksdb", - rocks_highest = max_highest_block, - checkpoint, - "AccountsHistory behind checkpoint, unwind needed" - ); - return Ok(Some(max_highest_block)); - } - - Ok(None) - } - None => { - // Empty RocksDB table, nothing to check. - Ok(None) - } - } - } - - /// Prunes `AccountsHistory` entries where `highest_block_number` > `max_block`. - /// - /// For `AccountsHistory`, the key is `ShardedKey
` which contains - /// `highest_block_number`, so we can iterate and delete entries where - /// `key.highest_block_number > max_block`. - /// - /// TODO(): this iterates the whole table, - /// which is inefficient. Use changeset-based pruning instead. - fn prune_accounts_history_above(&self, max_block: BlockNumber) -> ProviderResult<()> { - use alloy_primitives::Address; - use reth_db_api::models::ShardedKey; - - let mut to_delete: Vec> = Vec::new(); - for result in self.iter::()? { - let (key, _) = result?; - let highest_block = key.highest_block_number; - if max_block == 0 || (highest_block != u64::MAX && highest_block > max_block) { - to_delete.push(key); - } - } - - let deleted = to_delete.len(); - if deleted > 0 { + // Fast path: if checkpoint is 0 and RocksDB has data, clear everything. + if checkpoint == 0 && self.first::()?.is_some() { tracing::info!( target: "reth::providers::rocksdb", - deleted_count = deleted, - max_block, - "Pruning AccountsHistory entries" + "AccountsHistory has data but checkpoint is 0, clearing all" ); - - let mut batch = self.batch(); - for key in to_delete { - batch.delete::(key)?; - } - batch.commit()?; + self.clear::()?; + return Ok(None); } - Ok(()) + let sf_tip = provider + .static_file_provider() + .get_highest_static_file_block(StaticFileSegment::AccountChangeSets) + .unwrap_or(0); + + if sf_tip < checkpoint { + // This should never happen in normal operation - static files are always + // committed before RocksDB. If we get here, something is seriously wrong. + // The unwind is a best-effort attempt but is probably futile. + tracing::warn!( + target: "reth::providers::rocksdb", + sf_tip, + checkpoint, + "AccountsHistory: static file tip behind checkpoint, unwind needed" + ); + return Ok(Some(sf_tip)); + } + + if sf_tip == checkpoint { + return Ok(None); + } + + let total_blocks = sf_tip - checkpoint; + tracing::info!( + target: "reth::providers::rocksdb", + checkpoint, + sf_tip, + total_blocks, + "AccountsHistory: healing via changesets" + ); + + let mut batch_start = checkpoint + 1; + let mut batch_num = 0u64; + let total_batches = total_blocks.div_ceil(HEAL_HISTORY_BATCH_SIZE); + + while batch_start <= sf_tip { + let batch_end = (batch_start + HEAL_HISTORY_BATCH_SIZE - 1).min(sf_tip); + batch_num += 1; + + let changesets = provider.account_changesets_range(batch_start..=batch_end)?; + + let mut addresses = HashSet::with_capacity(changesets.len()); + addresses.extend(changesets.iter().map(|(_, cs)| cs.address)); + let unwind_from = checkpoint + 1; + let indices: Vec<_> = addresses.into_iter().map(|addr| (addr, unwind_from)).collect(); + + if !indices.is_empty() { + tracing::info!( + target: "reth::providers::rocksdb", + batch_num, + total_batches, + batch_start, + batch_end, + indices_count = indices.len(), + "AccountsHistory: unwinding batch" + ); + + let batch = self.unwind_account_history_indices(&indices)?; + self.commit_batch(batch)?; + } + + batch_start = batch_end + 1; + } + + Ok(None) } } @@ -473,8 +420,9 @@ impl RocksDBProvider { mod tests { use super::*; use crate::{ - providers::rocksdb::RocksDBBuilder, test_utils::create_test_provider_factory, BlockWriter, - DatabaseProviderFactory, StageCheckpointWriter, TransactionsProvider, + providers::{rocksdb::RocksDBBuilder, static_file::StaticFileWriter}, + test_utils::create_test_provider_factory, + BlockWriter, DatabaseProviderFactory, StageCheckpointWriter, TransactionsProvider, }; use alloy_primitives::{Address, B256}; use reth_db::cursor::DbCursorRW; @@ -680,9 +628,10 @@ mod tests { let provider = factory.database_provider_ro().unwrap(); // RocksDB is empty but checkpoint says block 100 was processed. - // This is treated as a first-run/migration scenario - no unwind needed. + // Since sf_tip=0 < checkpoint=100, we return unwind target of 0. + // This should never happen in normal operation. let result = rocksdb.check_consistency(&provider).unwrap(); - assert_eq!(result, None, "Empty RocksDB with checkpoint is treated as first run"); + assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target"); } #[test] @@ -721,46 +670,6 @@ mod tests { ); } - #[test] - fn test_check_consistency_storages_history_behind_checkpoint_needs_unwind() { - let temp_dir = TempDir::new().unwrap(); - let rocksdb = RocksDBBuilder::new(temp_dir.path()) - .with_table::() - .build() - .unwrap(); - - // Insert data into RocksDB with max highest_block_number = 80 - let key_block_50 = StorageShardedKey::new(Address::ZERO, B256::ZERO, 50); - let key_block_80 = StorageShardedKey::new(Address::ZERO, B256::from([1u8; 32]), 80); - let key_block_max = StorageShardedKey::new(Address::ZERO, B256::from([2u8; 32]), u64::MAX); - - let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]); - rocksdb.put::(key_block_50, &block_list).unwrap(); - rocksdb.put::(key_block_80, &block_list).unwrap(); - rocksdb.put::(key_block_max, &block_list).unwrap(); - - // Create a test provider factory for MDBX - let factory = create_test_provider_factory(); - factory.set_storage_settings_cache( - StorageSettings::legacy().with_storages_history_in_rocksdb(true), - ); - - // Set checkpoint to block 100 - { - let provider = factory.database_provider_rw().unwrap(); - provider - .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100)) - .unwrap(); - provider.commit().unwrap(); - } - - let provider = factory.database_provider_ro().unwrap(); - - // RocksDB max highest_block (80) is behind checkpoint (100) - let result = rocksdb.check_consistency(&provider).unwrap(); - assert_eq!(result, Some(80), "Should unwind to the highest block present in RocksDB"); - } - #[test] fn test_check_consistency_mdbx_behind_checkpoint_needs_unwind() { let temp_dir = TempDir::new().unwrap(); @@ -923,67 +832,6 @@ mod tests { } } - #[test] - fn test_check_consistency_storages_history_ahead_of_checkpoint_prunes_excess() { - let temp_dir = TempDir::new().unwrap(); - let rocksdb = RocksDBBuilder::new(temp_dir.path()) - .with_table::() - .build() - .unwrap(); - - // Insert data into RocksDB with different highest_block_numbers - let key_block_50 = StorageShardedKey::new(Address::ZERO, B256::ZERO, 50); - let key_block_100 = StorageShardedKey::new(Address::ZERO, B256::from([1u8; 32]), 100); - let key_block_150 = StorageShardedKey::new(Address::ZERO, B256::from([2u8; 32]), 150); - let key_block_max = StorageShardedKey::new(Address::ZERO, B256::from([3u8; 32]), u64::MAX); - - let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]); - rocksdb.put::(key_block_50.clone(), &block_list).unwrap(); - rocksdb.put::(key_block_100.clone(), &block_list).unwrap(); - rocksdb.put::(key_block_150.clone(), &block_list).unwrap(); - rocksdb.put::(key_block_max.clone(), &block_list).unwrap(); - - // Create a test provider factory for MDBX - let factory = create_test_provider_factory(); - factory.set_storage_settings_cache( - StorageSettings::legacy().with_storages_history_in_rocksdb(true), - ); - - // Set checkpoint to block 100 - { - let provider = factory.database_provider_rw().unwrap(); - provider - .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100)) - .unwrap(); - provider.commit().unwrap(); - } - - let provider = factory.database_provider_ro().unwrap(); - - // RocksDB has entries with highest_block = 150 which exceeds checkpoint (100) - // Should prune entries where highest_block > 100 (but not u64::MAX sentinel) - let result = rocksdb.check_consistency(&provider).unwrap(); - assert_eq!(result, None, "Should heal by pruning, no unwind needed"); - - // Verify key_block_150 was pruned, but others remain - assert!( - rocksdb.get::(key_block_50).unwrap().is_some(), - "Entry with highest_block=50 should remain" - ); - assert!( - rocksdb.get::(key_block_100).unwrap().is_some(), - "Entry with highest_block=100 should remain" - ); - assert!( - rocksdb.get::(key_block_150).unwrap().is_none(), - "Entry with highest_block=150 should be pruned" - ); - assert!( - rocksdb.get::(key_block_max).unwrap().is_some(), - "Entry with highest_block=u64::MAX (sentinel) should remain" - ); - } - #[test] fn test_check_consistency_storages_history_sentinel_only_with_checkpoint_is_first_run() { let temp_dir = TempDir::new().unwrap(); @@ -1020,13 +868,11 @@ mod tests { let provider = factory.database_provider_ro().unwrap(); - // RocksDB has only sentinel entries (no completed shards) but checkpoint is set. - // This is treated as a first-run/migration scenario - no unwind needed. + // RocksDB has only sentinel entries but checkpoint is set. + // Since sf_tip=0 < checkpoint=100, we return unwind target of 0. + // This should never happen in normal operation. let result = rocksdb.check_consistency(&provider).unwrap(); - assert_eq!( - result, None, - "Sentinel-only entries with checkpoint should be treated as first run" - ); + assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target"); } #[test] @@ -1066,53 +912,11 @@ mod tests { let provider = factory.database_provider_ro().unwrap(); - // RocksDB has only sentinel entries (no completed shards) but checkpoint is set. - // This is treated as a first-run/migration scenario - no unwind needed. + // RocksDB has only sentinel entries but checkpoint is set. + // Since sf_tip=0 < checkpoint=100, we return unwind target of 0. + // This should never happen in normal operation. let result = rocksdb.check_consistency(&provider).unwrap(); - assert_eq!( - result, None, - "Sentinel-only entries with checkpoint should be treated as first run" - ); - } - - #[test] - fn test_check_consistency_storages_history_behind_checkpoint_single_entry() { - use reth_db_api::models::storage_sharded_key::StorageShardedKey; - - let temp_dir = TempDir::new().unwrap(); - let rocksdb = RocksDBBuilder::new(temp_dir.path()) - .with_table::() - .build() - .unwrap(); - - // Insert data into RocksDB with highest_block_number below checkpoint - let key_block_50 = StorageShardedKey::new(Address::ZERO, B256::ZERO, 50); - let block_list = BlockNumberList::new_pre_sorted([10, 20, 30, 50]); - rocksdb.put::(key_block_50, &block_list).unwrap(); - - let factory = create_test_provider_factory(); - factory.set_storage_settings_cache( - StorageSettings::legacy().with_storages_history_in_rocksdb(true), - ); - - // Set checkpoint to block 100 - { - let provider = factory.database_provider_rw().unwrap(); - provider - .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100)) - .unwrap(); - provider.commit().unwrap(); - } - - let provider = factory.database_provider_ro().unwrap(); - - // RocksDB only has data up to block 50, but checkpoint says block 100 was processed - let result = rocksdb.check_consistency(&provider).unwrap(); - assert_eq!( - result, - Some(50), - "Should require unwind to block 50 to rebuild StoragesHistory" - ); + assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target"); } /// Test that pruning works by fetching transactions and computing their hashes, @@ -1257,9 +1061,10 @@ mod tests { let provider = factory.database_provider_ro().unwrap(); // RocksDB is empty but checkpoint says block 100 was processed. - // This is treated as a first-run/migration scenario - no unwind needed. + // Since sf_tip=0 < checkpoint=100, we return unwind target of 0. + // This should never happen in normal operation. let result = rocksdb.check_consistency(&provider).unwrap(); - assert_eq!(result, None, "Empty RocksDB with checkpoint is treated as first run"); + assert_eq!(result, Some(0), "sf_tip=0 < checkpoint=100 returns unwind target"); } #[test] @@ -1301,8 +1106,10 @@ mod tests { } #[test] - fn test_check_consistency_accounts_history_ahead_of_checkpoint_prunes_excess() { + fn test_check_consistency_accounts_history_sf_tip_equals_checkpoint_no_action() { + use reth_db::models::AccountBeforeTx; use reth_db_api::models::ShardedKey; + use reth_static_file_types::StaticFileSegment; let temp_dir = TempDir::new().unwrap(); let rocksdb = RocksDBBuilder::new(temp_dir.path()) @@ -1310,17 +1117,21 @@ mod tests { .build() .unwrap(); - // Insert data into RocksDB with different highest_block_numbers - let key_block_50 = ShardedKey::new(Address::ZERO, 50); - let key_block_100 = ShardedKey::new(Address::random(), 100); - let key_block_150 = ShardedKey::new(Address::random(), 150); - let key_block_max = ShardedKey::new(Address::random(), u64::MAX); + // Insert some AccountsHistory entries with various highest_block_numbers + let key1 = ShardedKey::new(Address::ZERO, 50); + let key2 = ShardedKey::new(Address::random(), 75); + let key3 = ShardedKey::new(Address::random(), u64::MAX); // sentinel + let block_list1 = BlockNumberList::new_pre_sorted([10, 20, 30, 50]); + let block_list2 = BlockNumberList::new_pre_sorted([40, 60, 75]); + let block_list3 = BlockNumberList::new_pre_sorted([80, 90, 100]); + rocksdb.put::(key1, &block_list1).unwrap(); + rocksdb.put::(key2, &block_list2).unwrap(); + rocksdb.put::(key3, &block_list3).unwrap(); - let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]); - rocksdb.put::(key_block_50.clone(), &block_list).unwrap(); - rocksdb.put::(key_block_100.clone(), &block_list).unwrap(); - rocksdb.put::(key_block_150.clone(), &block_list).unwrap(); - rocksdb.put::(key_block_max.clone(), &block_list).unwrap(); + // Capture RocksDB state before consistency check + let entries_before: Vec<_> = + rocksdb.iter::().unwrap().map(|r| r.unwrap()).collect(); + assert_eq!(entries_before.len(), 3, "Should have 3 entries before check"); // Create a test provider factory for MDBX let factory = create_test_provider_factory(); @@ -1328,7 +1139,21 @@ mod tests { StorageSettings::legacy().with_account_history_in_rocksdb(true), ); - // Set checkpoint to block 100 + // Write account changesets to static files for blocks 0-100 + { + let sf_provider = factory.static_file_provider(); + let mut writer = + sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap(); + + for block_num in 0..=100 { + let changeset = vec![AccountBeforeTx { address: Address::random(), info: None }]; + writer.append_account_changeset(changeset, block_num).unwrap(); + } + + writer.commit().unwrap(); + } + + // Set IndexAccountHistory checkpoint to block 100 (same as sf_tip) { let provider = factory.database_provider_rw().unwrap(); provider @@ -1339,33 +1164,285 @@ mod tests { let provider = factory.database_provider_ro().unwrap(); - // RocksDB has entries with highest_block = 150 which exceeds checkpoint (100) - // Should prune entries where highest_block > 100 (but not u64::MAX sentinel) - let result = rocksdb.check_consistency(&provider).unwrap(); - assert_eq!(result, None, "Should heal by pruning, no unwind needed"); + // Verify sf_tip equals checkpoint (both at 100) + let sf_tip = provider + .static_file_provider() + .get_highest_static_file_block(StaticFileSegment::AccountChangeSets) + .unwrap(); + assert_eq!(sf_tip, 100, "Static file tip should be 100"); - // Verify key_block_150 was pruned, but others remain - assert!( - rocksdb.get::(key_block_50).unwrap().is_some(), - "Entry with highest_block=50 should remain" + // Run check_consistency - should return None (no unwind needed) + let result = rocksdb.check_consistency(&provider).unwrap(); + assert_eq!(result, None, "sf_tip == checkpoint should not require unwind"); + + // Verify NO entries are deleted - RocksDB state unchanged + let entries_after: Vec<_> = + rocksdb.iter::().unwrap().map(|r| r.unwrap()).collect(); + + assert_eq!( + entries_after.len(), + entries_before.len(), + "RocksDB entry count should be unchanged when sf_tip == checkpoint" ); - assert!( - rocksdb.get::(key_block_100).unwrap().is_some(), - "Entry with highest_block=100 should remain" + + // Verify exact entries are preserved + for (before, after) in entries_before.iter().zip(entries_after.iter()) { + assert_eq!(before.0.key, after.0.key, "Entry key should be unchanged"); + assert_eq!( + before.0.highest_block_number, after.0.highest_block_number, + "Entry highest_block_number should be unchanged" + ); + assert_eq!(before.1, after.1, "Entry block list should be unchanged"); + } + } + + /// Tests `StoragesHistory` changeset-based healing with enough blocks to trigger batching. + /// + /// Scenario: + /// 1. Generate 15,000 blocks worth of storage changeset data (to exceed the 10k batch size) + /// 2. Each block has 1 storage change (address + slot + value) + /// 3. Write storage changesets to static files for all 15k blocks + /// 4. Set `IndexStorageHistory` checkpoint to block 5000 + /// 5. Insert stale `StoragesHistory` entries in `RocksDB` for (address, slot) pairs that + /// changed in blocks 5001-15000 + /// 6. Run `check_consistency` + /// 7. Verify stale entries for blocks > 5000 are pruned and batching worked + #[test] + fn test_check_consistency_storages_history_heals_via_changesets_large_range() { + use alloy_primitives::U256; + use reth_db_api::models::StorageBeforeTx; + + const TOTAL_BLOCKS: u64 = 15_000; + const CHECKPOINT_BLOCK: u64 = 5_000; + + let temp_dir = TempDir::new().unwrap(); + let rocksdb = RocksDBBuilder::new(temp_dir.path()) + .with_table::() + .build() + .unwrap(); + + let factory = create_test_provider_factory(); + factory.set_storage_settings_cache( + StorageSettings::legacy() + .with_storages_history_in_rocksdb(true) + .with_storage_changesets_in_static_files(true), ); - assert!( - rocksdb.get::(key_block_150).unwrap().is_none(), - "Entry with highest_block=150 should be pruned" + + // Helper to generate address from block number (reuses stack arrays) + #[inline] + fn make_address(block_num: u64) -> Address { + let mut addr_bytes = [0u8; 20]; + addr_bytes[0..8].copy_from_slice(&block_num.to_le_bytes()); + Address::from(addr_bytes) + } + + // Helper to generate slot from block number (reuses stack arrays) + #[inline] + fn make_slot(block_num: u64) -> B256 { + let mut slot_bytes = [0u8; 32]; + slot_bytes[0..8].copy_from_slice(&block_num.to_le_bytes()); + B256::from(slot_bytes) + } + + // Write storage changesets to static files for 15k blocks. + // Each block has 1 storage change with a unique (address, slot) pair. + { + let sf_provider = factory.static_file_provider(); + let mut writer = + sf_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap(); + + // Reuse changeset vec to avoid repeated allocations + let mut changeset = Vec::with_capacity(1); + + for block_num in 0..TOTAL_BLOCKS { + changeset.clear(); + changeset.push(StorageBeforeTx { + address: make_address(block_num), + key: make_slot(block_num), + value: U256::from(block_num), + }); + + writer.append_storage_changeset(changeset.clone(), block_num).unwrap(); + } + + writer.commit().unwrap(); + } + + // Verify static files have data up to block 14999 + { + let sf_provider = factory.static_file_provider(); + let highest = sf_provider + .get_highest_static_file_block(StaticFileSegment::StorageChangeSets) + .unwrap(); + assert_eq!(highest, TOTAL_BLOCKS - 1, "Static files should have blocks 0..14999"); + } + + // Set IndexStorageHistory checkpoint to block 5000 + { + let provider = factory.database_provider_rw().unwrap(); + provider + .save_stage_checkpoint( + StageId::IndexStorageHistory, + StageCheckpoint::new(CHECKPOINT_BLOCK), + ) + .unwrap(); + provider.commit().unwrap(); + } + + // Insert stale StoragesHistory entries for blocks 5001-14999 + // These are (address, slot) pairs that changed after the checkpoint + for block_num in (CHECKPOINT_BLOCK + 1)..TOTAL_BLOCKS { + let key = + StorageShardedKey::new(make_address(block_num), make_slot(block_num), block_num); + let block_list = BlockNumberList::new_pre_sorted([block_num]); + rocksdb.put::(key, &block_list).unwrap(); + } + + // Verify RocksDB has stale entries before healing + let count_before: usize = rocksdb.iter::().unwrap().count(); + assert_eq!( + count_before, + (TOTAL_BLOCKS - CHECKPOINT_BLOCK - 1) as usize, + "Should have {} stale entries before healing", + TOTAL_BLOCKS - CHECKPOINT_BLOCK - 1 ); - assert!( - rocksdb.get::(key_block_max).unwrap().is_some(), - "Entry with highest_block=u64::MAX (sentinel) should remain" + + // Run check_consistency - this should heal by pruning stale entries + let provider = factory.database_provider_ro().unwrap(); + let result = rocksdb.check_consistency(&provider).unwrap(); + assert_eq!(result, None, "Should heal via changesets, no unwind needed"); + + // Verify all stale entries were pruned + // After healing, entries with highest_block_number > checkpoint should be gone + let mut remaining_stale = 0; + for result in rocksdb.iter::().unwrap() { + let (key, _) = result.unwrap(); + if key.sharded_key.highest_block_number > CHECKPOINT_BLOCK { + remaining_stale += 1; + } + } + assert_eq!( + remaining_stale, 0, + "All stale entries (block > {}) should be pruned", + CHECKPOINT_BLOCK ); } + /// Tests that healing preserves entries at exactly the checkpoint block. + /// + /// This catches off-by-one bugs where checkpoint block data is incorrectly deleted. #[test] - fn test_check_consistency_accounts_history_behind_checkpoint_needs_unwind() { + fn test_check_consistency_storages_history_preserves_checkpoint_block() { + use alloy_primitives::U256; + use reth_db_api::models::StorageBeforeTx; + + const CHECKPOINT_BLOCK: u64 = 100; + const SF_TIP: u64 = 200; + + let temp_dir = TempDir::new().unwrap(); + let rocksdb = RocksDBBuilder::new(temp_dir.path()) + .with_table::() + .build() + .unwrap(); + + let factory = create_test_provider_factory(); + factory.set_storage_settings_cache( + StorageSettings::legacy() + .with_storages_history_in_rocksdb(true) + .with_storage_changesets_in_static_files(true), + ); + + let checkpoint_addr = Address::repeat_byte(0xAA); + let checkpoint_slot = B256::repeat_byte(0xBB); + let stale_addr = Address::repeat_byte(0xCC); + let stale_slot = B256::repeat_byte(0xDD); + + // Write storage changesets to static files + { + let sf_provider = factory.static_file_provider(); + let mut writer = + sf_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap(); + + for block_num in 0..=SF_TIP { + let changeset = if block_num == CHECKPOINT_BLOCK { + vec![StorageBeforeTx { + address: checkpoint_addr, + key: checkpoint_slot, + value: U256::from(block_num), + }] + } else if block_num > CHECKPOINT_BLOCK { + vec![StorageBeforeTx { + address: stale_addr, + key: stale_slot, + value: U256::from(block_num), + }] + } else { + vec![StorageBeforeTx { + address: Address::ZERO, + key: B256::ZERO, + value: U256::ZERO, + }] + }; + writer.append_storage_changeset(changeset, block_num).unwrap(); + } + writer.commit().unwrap(); + } + + // Set checkpoint + { + let provider = factory.database_provider_rw().unwrap(); + provider + .save_stage_checkpoint( + StageId::IndexStorageHistory, + StageCheckpoint::new(CHECKPOINT_BLOCK), + ) + .unwrap(); + provider.commit().unwrap(); + } + + // Insert entry AT the checkpoint block (should be preserved) + let checkpoint_key = + StorageShardedKey::new(checkpoint_addr, checkpoint_slot, CHECKPOINT_BLOCK); + let checkpoint_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK]); + rocksdb.put::(checkpoint_key.clone(), &checkpoint_list).unwrap(); + + // Insert stale entry AFTER the checkpoint (should be removed) + let stale_key = StorageShardedKey::new(stale_addr, stale_slot, SF_TIP); + let stale_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK + 1, SF_TIP]); + rocksdb.put::(stale_key.clone(), &stale_list).unwrap(); + + // Run healing + let provider = factory.database_provider_ro().unwrap(); + let result = rocksdb.check_consistency(&provider).unwrap(); + assert_eq!(result, None, "Should heal without unwind"); + + // Verify checkpoint block entry is PRESERVED + let preserved = rocksdb.get::(checkpoint_key).unwrap(); + assert!(preserved.is_some(), "Entry at checkpoint block should be preserved, not deleted"); + + // Verify stale entry is removed or unwound + let stale = rocksdb.get::(stale_key).unwrap(); + assert!(stale.is_none(), "Stale entry after checkpoint should be removed"); + } + + /// Tests `AccountsHistory` changeset-based healing with enough blocks to trigger batching. + /// + /// Scenario: + /// 1. Generate 15,000 blocks worth of account changeset data (to exceed the 10k batch size) + /// 2. Each block has 1 account change (simple - just random addresses) + /// 3. Write account changesets to static files for all 15k blocks + /// 4. Set `IndexAccountHistory` checkpoint to block 5000 + /// 5. Insert stale `AccountsHistory` entries in `RocksDB` for addresses that changed in blocks + /// 5001-15000 + /// 6. Run `check_consistency` + /// 7. Verify: + /// - Stale entries for blocks > 5000 are pruned + /// - The batching worked (no OOM, completed successfully) + #[test] + fn test_check_consistency_accounts_history_heals_via_changesets_large_range() { + use reth_db::models::AccountBeforeTx; use reth_db_api::models::ShardedKey; + use reth_static_file_types::StaticFileSegment; let temp_dir = TempDir::new().unwrap(); let rocksdb = RocksDBBuilder::new(temp_dir.path()) @@ -1373,33 +1450,276 @@ mod tests { .build() .unwrap(); - // Insert data into RocksDB with highest_block_number below checkpoint - let key_block_50 = ShardedKey::new(Address::ZERO, 50); - let block_list = BlockNumberList::new_pre_sorted([10, 20, 30, 50]); - rocksdb.put::(key_block_50, &block_list).unwrap(); - + // Create test provider factory let factory = create_test_provider_factory(); factory.set_storage_settings_cache( - StorageSettings::legacy().with_account_history_in_rocksdb(true), + StorageSettings::legacy() + .with_account_history_in_rocksdb(true) + .with_account_changesets_in_static_files(true), ); - // Set checkpoint to block 100 + const TOTAL_BLOCKS: u64 = 15_000; + const CHECKPOINT_BLOCK: u64 = 5_000; + + // Helper to generate address from block number (avoids pre-allocating 15k addresses) + #[inline] + fn make_address(block_num: u64) -> Address { + let mut addr = Address::ZERO; + addr.0[0..8].copy_from_slice(&block_num.to_le_bytes()); + addr + } + + // Write account changesets to static files for all 15k blocks + { + let sf_provider = factory.static_file_provider(); + let mut writer = + sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap(); + + // Reuse changeset vec to avoid repeated allocations + let mut changeset = Vec::with_capacity(1); + + for block_num in 0..TOTAL_BLOCKS { + changeset.clear(); + changeset.push(AccountBeforeTx { address: make_address(block_num), info: None }); + writer.append_account_changeset(changeset.clone(), block_num).unwrap(); + } + + writer.commit().unwrap(); + } + + // Insert stale AccountsHistory entries in RocksDB for addresses that changed + // in blocks 5001-15000 (i.e., blocks after the checkpoint) + // These should be pruned by check_consistency + for block_num in (CHECKPOINT_BLOCK + 1)..TOTAL_BLOCKS { + let key = ShardedKey::new(make_address(block_num), block_num); + let block_list = BlockNumberList::new_pre_sorted([block_num]); + rocksdb.put::(key, &block_list).unwrap(); + } + + // Also insert some valid entries for blocks <= 5000 that should NOT be pruned + for block_num in [100u64, 500, 1000, 2500, 5000] { + let key = ShardedKey::new(make_address(block_num), block_num); + let block_list = BlockNumberList::new_pre_sorted([block_num]); + rocksdb.put::(key, &block_list).unwrap(); + } + + // Verify we have entries before healing + let entries_before: usize = rocksdb.iter::().unwrap().count(); + let stale_count = (TOTAL_BLOCKS - CHECKPOINT_BLOCK - 1) as usize; + let valid_count = 5usize; + assert_eq!( + entries_before, + stale_count + valid_count, + "Should have {} stale + {} valid entries before healing", + stale_count, + valid_count + ); + + // Set IndexAccountHistory checkpoint to block 5000 { let provider = factory.database_provider_rw().unwrap(); provider - .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100)) + .save_stage_checkpoint( + StageId::IndexAccountHistory, + StageCheckpoint::new(CHECKPOINT_BLOCK), + ) .unwrap(); provider.commit().unwrap(); } let provider = factory.database_provider_ro().unwrap(); - // RocksDB only has data up to block 50, but checkpoint says block 100 was processed + // Verify sf_tip > checkpoint + let sf_tip = provider + .static_file_provider() + .get_highest_static_file_block(StaticFileSegment::AccountChangeSets) + .unwrap(); + assert_eq!(sf_tip, TOTAL_BLOCKS - 1, "Static file tip should be 14999"); + assert!(sf_tip > CHECKPOINT_BLOCK, "sf_tip should be > checkpoint to trigger healing"); + + // Run check_consistency - this should trigger batched changeset-based healing let result = rocksdb.check_consistency(&provider).unwrap(); + assert_eq!(result, None, "Healing should succeed without requiring unwind"); + + // Verify: all stale entries for blocks > 5000 should be pruned + // Count remaining entries with highest_block_number > checkpoint + let mut remaining_stale = 0; + for result in rocksdb.iter::().unwrap() { + let (key, _) = result.unwrap(); + if key.highest_block_number > CHECKPOINT_BLOCK && key.highest_block_number != u64::MAX { + remaining_stale += 1; + } + } assert_eq!( - result, - Some(50), - "Should require unwind to block 50 to rebuild AccountsHistory" + remaining_stale, 0, + "All stale entries (block > {}) should be pruned", + CHECKPOINT_BLOCK ); } + + /// Tests that accounts history healing preserves entries at exactly the checkpoint block. + #[test] + fn test_check_consistency_accounts_history_preserves_checkpoint_block() { + use reth_db::models::AccountBeforeTx; + use reth_db_api::models::ShardedKey; + + const CHECKPOINT_BLOCK: u64 = 100; + const SF_TIP: u64 = 200; + + let temp_dir = TempDir::new().unwrap(); + let rocksdb = RocksDBBuilder::new(temp_dir.path()) + .with_table::() + .build() + .unwrap(); + + let factory = create_test_provider_factory(); + factory.set_storage_settings_cache( + StorageSettings::legacy() + .with_account_history_in_rocksdb(true) + .with_account_changesets_in_static_files(true), + ); + + let checkpoint_addr = Address::repeat_byte(0xAA); + let stale_addr = Address::repeat_byte(0xCC); + + // Write account changesets to static files + { + let sf_provider = factory.static_file_provider(); + let mut writer = + sf_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap(); + + for block_num in 0..=SF_TIP { + let changeset = if block_num == CHECKPOINT_BLOCK { + vec![AccountBeforeTx { address: checkpoint_addr, info: None }] + } else if block_num > CHECKPOINT_BLOCK { + vec![AccountBeforeTx { address: stale_addr, info: None }] + } else { + vec![AccountBeforeTx { address: Address::ZERO, info: None }] + }; + writer.append_account_changeset(changeset, block_num).unwrap(); + } + writer.commit().unwrap(); + } + + // Set checkpoint + { + let provider = factory.database_provider_rw().unwrap(); + provider + .save_stage_checkpoint( + StageId::IndexAccountHistory, + StageCheckpoint::new(CHECKPOINT_BLOCK), + ) + .unwrap(); + provider.commit().unwrap(); + } + + // Insert entry AT the checkpoint block (should be preserved) + let checkpoint_key = ShardedKey::new(checkpoint_addr, CHECKPOINT_BLOCK); + let checkpoint_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK]); + rocksdb.put::(checkpoint_key.clone(), &checkpoint_list).unwrap(); + + // Insert stale entry AFTER the checkpoint (should be removed) + let stale_key = ShardedKey::new(stale_addr, SF_TIP); + let stale_list = BlockNumberList::new_pre_sorted([CHECKPOINT_BLOCK + 1, SF_TIP]); + rocksdb.put::(stale_key.clone(), &stale_list).unwrap(); + + // Run healing + let provider = factory.database_provider_ro().unwrap(); + let result = rocksdb.check_consistency(&provider).unwrap(); + assert_eq!(result, None, "Should heal without unwind"); + + // Verify checkpoint block entry is PRESERVED + let preserved = rocksdb.get::(checkpoint_key).unwrap(); + assert!(preserved.is_some(), "Entry at checkpoint block should be preserved, not deleted"); + + // Verify stale entry is removed or unwound + let stale = rocksdb.get::(stale_key).unwrap(); + assert!(stale.is_none(), "Stale entry after checkpoint should be removed"); + } + + #[test] + fn test_check_consistency_storages_history_sf_tip_equals_checkpoint_no_action() { + use alloy_primitives::U256; + use reth_db::models::StorageBeforeTx; + use reth_static_file_types::StaticFileSegment; + + let temp_dir = TempDir::new().unwrap(); + let rocksdb = RocksDBBuilder::new(temp_dir.path()) + .with_table::() + .build() + .unwrap(); + + // Insert StoragesHistory entries into RocksDB + let key1 = StorageShardedKey::new(Address::ZERO, B256::ZERO, 50); + let key2 = StorageShardedKey::new(Address::random(), B256::random(), 80); + let block_list1 = BlockNumberList::new_pre_sorted([10, 20, 30, 50]); + let block_list2 = BlockNumberList::new_pre_sorted([40, 60, 80]); + rocksdb.put::(key1, &block_list1).unwrap(); + rocksdb.put::(key2, &block_list2).unwrap(); + + // Capture entries before consistency check + let entries_before: Vec<_> = + rocksdb.iter::().unwrap().map(|r| r.unwrap()).collect(); + + // Create a test provider factory + let factory = create_test_provider_factory(); + factory.set_storage_settings_cache( + StorageSettings::legacy().with_storages_history_in_rocksdb(true), + ); + + // Write storage changesets to static files for blocks 0-100 + { + let sf_provider = factory.static_file_provider(); + let mut writer = + sf_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap(); + + for block_num in 0..=100u64 { + let changeset = vec![StorageBeforeTx { + address: Address::ZERO, + key: B256::with_last_byte(block_num as u8), + value: U256::from(block_num), + }]; + writer.append_storage_changeset(changeset, block_num).unwrap(); + } + writer.commit().unwrap(); + } + + // Set IndexStorageHistory checkpoint to block 100 (same as sf_tip) + { + let provider = factory.database_provider_rw().unwrap(); + provider + .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100)) + .unwrap(); + provider.commit().unwrap(); + } + + let provider = factory.database_provider_ro().unwrap(); + + // Verify sf_tip equals checkpoint (both at 100) + let sf_tip = provider + .static_file_provider() + .get_highest_static_file_block(StaticFileSegment::StorageChangeSets) + .unwrap(); + assert_eq!(sf_tip, 100, "Static file tip should be 100"); + + // Run check_consistency - should return None (no unwind needed) + let result = rocksdb.check_consistency(&provider).unwrap(); + assert_eq!(result, None, "sf_tip == checkpoint should not require unwind"); + + // Verify NO entries are deleted - RocksDB state unchanged + let entries_after: Vec<_> = + rocksdb.iter::().unwrap().map(|r| r.unwrap()).collect(); + + assert_eq!( + entries_after.len(), + entries_before.len(), + "RocksDB entry count should be unchanged when sf_tip == checkpoint" + ); + + // Verify exact entries are preserved + for (before, after) in entries_before.iter().zip(entries_after.iter()) { + assert_eq!(before.0, after.0, "Entry key should be unchanged"); + assert_eq!(before.1, after.1, "Entry block list should be unchanged"); + } + } }