diff --git a/crates/stages/stages/src/stages/index_account_history.rs b/crates/stages/stages/src/stages/index_account_history.rs index c8d6464cf3..b494ebf13c 100644 --- a/crates/stages/stages/src/stages/index_account_history.rs +++ b/crates/stages/stages/src/stages/index_account_history.rs @@ -2,7 +2,12 @@ use super::{collect_history_indices, load_history_indices}; use alloy_primitives::Address; use reth_config::config::{EtlConfig, IndexHistoryConfig}; use reth_db_api::{models::ShardedKey, table::Decode, tables, transaction::DbTxMut}; -use reth_provider::{DBProvider, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter}; +#[cfg(all(unix, feature = "rocksdb"))] +use reth_provider::EitherWriter; +use reth_provider::{ + DBProvider, HistoryWriter, NodePrimitivesProvider, PruneCheckpointReader, + PruneCheckpointWriter, RocksDBProviderFactory, StorageSettingsCache, +}; use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment}; use reth_stages_api::{ ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput, @@ -43,8 +48,13 @@ impl Default for IndexAccountHistoryStage { impl Stage for IndexAccountHistoryStage where - Provider: - DBProvider + HistoryWriter + PruneCheckpointReader + PruneCheckpointWriter, + Provider: DBProvider + + HistoryWriter + + PruneCheckpointReader + + PruneCheckpointWriter + + StorageSettingsCache + + RocksDBProviderFactory + + NodePrimitivesProvider, { /// Return the id of the stage fn id(&self) -> StageId { @@ -110,15 +120,55 @@ where &self.etl_config, )?; - info!(target: "sync::stages::index_account_history::exec", "Loading indices into database"); - load_history_indices::<_, tables::AccountsHistory, _>( - provider, - collector, - first_sync, - ShardedKey::new, - ShardedKey::
::decode_owned, - |key| key.key, - )?; + // Check if RocksDB is enabled for account history + #[cfg(all(unix, feature = "rocksdb"))] + let use_rocksdb = provider.cached_storage_settings().account_history_in_rocksdb; + #[cfg(not(all(unix, feature = "rocksdb")))] + let use_rocksdb = false; + + info!(target: "sync::stages::index_account_history::exec", ?use_rocksdb, "Loading indices into database"); + + if use_rocksdb { + #[cfg(all(unix, feature = "rocksdb"))] + { + // Create RocksDB batch + let rocksdb = provider.rocksdb_provider(); + let mut rocksdb_batch = rocksdb.batch(); + + if first_sync { + super::clear_rocksdb_table::( + &rocksdb, + &mut rocksdb_batch, + )?; + } + + // Create writer that routes to RocksDB + let mut writer = EitherWriter::new_accounts_history(provider, rocksdb_batch)?; + + // Load indices using the RocksDB path + super::load_account_history_indices_via_writer( + &mut writer, + collector, + first_sync, + provider, + )?; + + // Extract and register RocksDB batch for commit at provider level + if let Some(batch) = writer.into_raw_rocksdb_batch() { + provider.set_pending_rocksdb_batch(batch); + } + } + } else { + // Keep existing MDBX path unchanged + load_history_indices::<_, tables::AccountsHistory, _>( + provider, + collector, + first_sync, + ShardedKey::new, + ShardedKey::
::decode_owned, + |key| key.key, + )?; + } Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: true }) } @@ -132,9 +182,40 @@ where let (range, unwind_progress, _) = input.unwind_block_range_with_threshold(self.commit_threshold); - provider.unwind_account_history_indices_range(range)?; + // Check if RocksDB is enabled for account history + #[cfg(all(unix, feature = "rocksdb"))] + let use_rocksdb = provider.cached_storage_settings().account_history_in_rocksdb; + #[cfg(not(all(unix, feature = "rocksdb")))] + let use_rocksdb = false; + + if use_rocksdb { + #[cfg(all(unix, feature = "rocksdb"))] + { + use reth_db_api::{cursor::DbCursorRO, transaction::DbTx}; + use std::collections::BTreeSet; + + // Stream changesets directly into a set of affected addresses + let mut affected_addresses = BTreeSet::new(); + for entry in provider + .tx_ref() + .cursor_read::()? + .walk_range(range.clone())? + { + let (_block, account) = entry?; + affected_addresses.insert(account.address); + } + + // Unwind using RocksDB + super::unwind_account_history_via_rocksdb( + provider, + affected_addresses, + *range.start(), + )?; + } + } else { + provider.unwind_account_history_indices_range(range)?; + } - // from HistoryIndex higher than that number. Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) }) } } @@ -632,4 +713,427 @@ mod tests { Ok(()) } } + + #[cfg(all(unix, feature = "rocksdb"))] + mod rocksdb_tests { + use super::*; + use reth_provider::RocksDBProviderFactory; + use reth_storage_api::StorageSettings; + + #[tokio::test] + async fn execute_writes_account_history_to_rocksdb_when_enabled() { + let db = TestStageDB::default(); + + // Enable RocksDB for account history + db.factory.set_storage_settings_cache( + StorageSettings::legacy().with_account_history_in_rocksdb(true), + ); + + // setup + partial_setup(&db); + + // run + run(&db, MAX_BLOCK, None); + + // Verify MDBX table is empty (data should be in RocksDB) + let mdbx_count = db.table::().unwrap().len(); + assert_eq!( + mdbx_count, 0, + "MDBX AccountsHistory should be empty when RocksDB is enabled" + ); + + // Verify RocksDB has the data + let rocksdb = db.factory.rocksdb_provider(); + let result = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(result.is_some(), "Account history should exist in RocksDB"); + + // Verify the block numbers are correct + let list = result.unwrap(); + let blocks: Vec = list.iter().collect(); + assert!(!blocks.is_empty(), "Block list should not be empty"); + } + + #[tokio::test] + async fn first_sync_clears_stale_rocksdb_account_history() { + let db = TestStageDB::default(); + + // Enable RocksDB for account history + db.factory.set_storage_settings_cache( + StorageSettings::legacy().with_account_history_in_rocksdb(true), + ); + + // Seed RocksDB with a stale entry for a different address + let rocksdb = db.factory.rocksdb_provider(); + let stale_address = address!("0x0000000000000000000000000000000000000002"); + let stale_key = ShardedKey { key: stale_address, highest_block_number: u64::MAX }; + rocksdb.put::(stale_key.clone(), &list(&[999])).unwrap(); + + // setup + partial_setup(&db); + + // run (first sync) + run(&db, MAX_BLOCK, None); + + // Verify stale entry is removed + let rocksdb = db.factory.rocksdb_provider(); + let stale = rocksdb.get::(stale_key).unwrap(); + assert!( + stale.is_none(), + "Stale RocksDB account history should be cleared on first sync" + ); + } + + #[tokio::test] + async fn execute_writes_to_mdbx_when_rocksdb_disabled() { + let db = TestStageDB::default(); + + // Ensure RocksDB is disabled for account history (default) + db.factory.set_storage_settings_cache( + StorageSettings::legacy().with_account_history_in_rocksdb(false), + ); + + // setup + partial_setup(&db); + + // run + run(&db, MAX_BLOCK, None); + + // Verify MDBX table has data + let mdbx_count = db.table::().unwrap().len(); + assert!( + mdbx_count > 0, + "MDBX AccountsHistory should have data when RocksDB is disabled" + ); + } + + /// Test incremental sync with RocksDB: run stage twice and verify indices are merged. + #[tokio::test] + async fn incremental_sync_merges_indices_in_rocksdb() { + let db = TestStageDB::default(); + + // Enable RocksDB for account history + db.factory.set_storage_settings_cache( + StorageSettings::legacy().with_account_history_in_rocksdb(true), + ); + + // First run: setup changesets for blocks 0-5 and index them + let first_run_end: u64 = 5; + db.commit(|tx| { + for block in 0..=first_run_end { + tx.put::( + block, + StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, + )?; + tx.put::(block, acc())?; + } + Ok(()) + }) + .unwrap(); + + // Run stage for first batch (first_sync = true since checkpoint is 0) + run(&db, first_run_end, None); + + // Verify first run wrote to RocksDB + let rocksdb = db.factory.rocksdb_provider(); + let first_result = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(first_result.is_some(), "First run should write to RocksDB"); + let first_blocks: Vec = first_result.unwrap().iter().collect(); + assert_eq!( + first_blocks.len(), + (first_run_end + 1) as usize, + "First run should have blocks 0-5" + ); + + // Second run: add changesets for blocks 6-10 + let second_run_end: u64 = 10; + db.commit(|tx| { + for block in (first_run_end + 1)..=second_run_end { + tx.put::( + block, + StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, + )?; + tx.put::(block, acc())?; + } + Ok(()) + }) + .unwrap(); + + // Run stage for second batch (first_sync = false since checkpoint > 0) + run(&db, second_run_end, Some(first_run_end)); + + // Verify second run merged indices in RocksDB + let rocksdb = db.factory.rocksdb_provider(); + let merged_result = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(merged_result.is_some(), "Second run should have data in RocksDB"); + let merged_blocks: Vec = merged_result.unwrap().iter().collect(); + + // Should contain all blocks from 0 to 10 + let expected_blocks: Vec = (0..=second_run_end).collect(); + assert_eq!( + merged_blocks, expected_blocks, + "Incremental sync should merge all indices: expected {:?}, got {:?}", + expected_blocks, merged_blocks + ); + + // Verify MDBX table is still empty + let mdbx_count = db.table::().unwrap().len(); + assert_eq!(mdbx_count, 0, "MDBX should remain empty when RocksDB is enabled"); + } + + /// Test unwind removes account history from RocksDB when enabled. + #[tokio::test] + async fn unwind_removes_account_history_from_rocksdb() { + let db = TestStageDB::default(); + + // Enable RocksDB for account history + db.factory.set_storage_settings_cache( + StorageSettings::legacy().with_account_history_in_rocksdb(true), + ); + + // setup changesets for blocks 0-10 + db.commit(|tx| { + for block in 0..=10u64 { + tx.put::( + block, + StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, + )?; + tx.put::(block, acc())?; + } + Ok(()) + }) + .unwrap(); + + // Run stage to index blocks 0-10 + run(&db, 10, None); + + // Verify RocksDB has all blocks + let rocksdb = db.factory.rocksdb_provider(); + let before_unwind = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(before_unwind.is_some(), "RocksDB should have data before unwind"); + let before_blocks: Vec = before_unwind.unwrap().iter().collect(); + assert_eq!(before_blocks, (0..=10).collect::>(), "Should have blocks 0-10"); + + // Unwind to block 5 + unwind(&db, 10, 5); + + // Verify RocksDB only contains blocks 0-5 (blocks <= unwind_to are kept) + let rocksdb = db.factory.rocksdb_provider(); + let after_unwind = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(after_unwind.is_some(), "RocksDB should still have data after partial unwind"); + let after_blocks: Vec = after_unwind.unwrap().iter().collect(); + assert_eq!( + after_blocks, + (0..=5).collect::>(), + "After unwind to 5, should have blocks 0-5" + ); + + // Verify MDBX table is still empty + let mdbx_count = db.table::().unwrap().len(); + assert_eq!(mdbx_count, 0, "MDBX should remain empty during RocksDB unwind"); + } + + /// Test unwind to genesis removes all account history from RocksDB. + #[tokio::test] + async fn unwind_to_genesis_clears_rocksdb_account_history() { + let db = TestStageDB::default(); + + // Enable RocksDB for account history + db.factory.set_storage_settings_cache( + StorageSettings::legacy().with_account_history_in_rocksdb(true), + ); + + // setup changesets for blocks 0-5 + db.commit(|tx| { + for block in 0..=5u64 { + tx.put::( + block, + StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, + )?; + tx.put::(block, acc())?; + } + Ok(()) + }) + .unwrap(); + + // Run stage + run(&db, 5, None); + + // Verify RocksDB has data + let rocksdb = db.factory.rocksdb_provider(); + let before = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(before.is_some(), "RocksDB should have data before unwind"); + + // Unwind to genesis (block 0) + unwind(&db, 5, 0); + + // Verify RocksDB still has block 0 (blocks <= unwind_to are kept) + let rocksdb = db.factory.rocksdb_provider(); + let after = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(after.is_some(), "RocksDB should still have block 0 after unwind to genesis"); + let after_blocks: Vec = after.unwrap().iter().collect(); + assert_eq!(after_blocks, vec![0], "After unwind to 0, should only have block 0"); + } + + /// Test unwind with no changesets in range is a no-op. + #[tokio::test] + async fn unwind_with_no_changesets_is_noop() { + let db = TestStageDB::default(); + + // Enable RocksDB for account history + db.factory.set_storage_settings_cache( + StorageSettings::legacy().with_account_history_in_rocksdb(true), + ); + + // Setup changesets only for blocks 0-5 (leave gap at 6-10) + db.commit(|tx| { + for block in 0..=5u64 { + tx.put::( + block, + StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, + )?; + tx.put::(block, acc())?; + } + // Add block body indices for 6-10 but NO changesets + for block in 6..=10u64 { + tx.put::( + block, + StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, + )?; + } + Ok(()) + }) + .unwrap(); + + // Run stage to index blocks 0-10 + run(&db, 10, None); + + // Record state before unwind + let rocksdb = db.factory.rocksdb_provider(); + let before = rocksdb.get::(shard(u64::MAX)).unwrap(); + let before_blocks: Vec = before.unwrap().iter().collect(); + + // Unwind from 10 to 7 (range 8-10 has no changesets) + unwind(&db, 10, 7); + + // Verify RocksDB data is unchanged (no changesets in unwind range) + let rocksdb = db.factory.rocksdb_provider(); + let after = rocksdb.get::(shard(u64::MAX)).unwrap(); + let after_blocks: Vec = after.unwrap().iter().collect(); + assert_eq!( + before_blocks, after_blocks, + "Data should be unchanged when no changesets in unwind range" + ); + } + + /// Test unwind preserves unrelated addresses. + #[tokio::test] + async fn unwind_preserves_unrelated_addresses() { + let db = TestStageDB::default(); + + // Enable RocksDB for account history + db.factory.set_storage_settings_cache( + StorageSettings::legacy().with_account_history_in_rocksdb(true), + ); + + let address_a = address!("0x0000000000000000000000000000000000000001"); + let address_b = address!("0x0000000000000000000000000000000000000002"); + + // Setup: address_a has changes in blocks 0-10, address_b only in blocks 0-5 + db.commit(|tx| { + for block in 0..=10u64 { + tx.put::( + block, + StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, + )?; + // address_a changes in all blocks + tx.put::( + block, + AccountBeforeTx { address: address_a, info: None }, + )?; + // address_b only changes in blocks 0-5 + if block <= 5 { + tx.put::( + block, + AccountBeforeTx { address: address_b, info: None }, + )?; + } + } + Ok(()) + }) + .unwrap(); + + // Run stage + run(&db, 10, None); + + // Record address_b state before unwind + let rocksdb = db.factory.rocksdb_provider(); + let key_b = ShardedKey { key: address_b, highest_block_number: u64::MAX }; + let before_b = rocksdb.get::(key_b.clone()).unwrap(); + let before_b_blocks: Vec = before_b.unwrap().iter().collect(); + + // Unwind from 10 to 7 (only address_a has changes in 8-10) + unwind(&db, 10, 7); + + // Verify address_b is unchanged (no changes in unwind range) + let rocksdb = db.factory.rocksdb_provider(); + let after_b = rocksdb.get::(key_b).unwrap(); + let after_b_blocks: Vec = after_b.unwrap().iter().collect(); + assert_eq!( + before_b_blocks, after_b_blocks, + "Address B should be unchanged when it has no changes in unwind range" + ); + + // Verify address_a was properly unwound + let key_a = ShardedKey { key: address_a, highest_block_number: u64::MAX }; + let after_a = rocksdb.get::(key_a).unwrap(); + let after_a_blocks: Vec = after_a.unwrap().iter().collect(); + assert_eq!( + after_a_blocks, + (0..=7).collect::>(), + "Address A should have blocks 0-7 after unwind" + ); + } + + /// Test unwind deletes entry when all blocks are removed. + #[tokio::test] + async fn unwind_deletes_entry_when_all_blocks_removed() { + let db = TestStageDB::default(); + + // Enable RocksDB for account history + db.factory.set_storage_settings_cache( + StorageSettings::legacy().with_account_history_in_rocksdb(true), + ); + + // Setup: only add changesets for blocks 5-10 + db.commit(|tx| { + for block in 0..=10u64 { + tx.put::( + block, + StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, + )?; + if block >= 5 { + tx.put::(block, acc())?; + } + } + Ok(()) + }) + .unwrap(); + + // Run stage + run(&db, 10, None); + + // Verify RocksDB has data + let rocksdb = db.factory.rocksdb_provider(); + let before = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(before.is_some(), "RocksDB should have data before unwind"); + + // Unwind to block 4 (removes blocks 5-10, which is ALL the data) + unwind(&db, 10, 4); + + // Verify entry is deleted (no blocks left) + let rocksdb = db.factory.rocksdb_provider(); + let after = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(after.is_none(), "Entry should be deleted when all blocks are removed"); + } + } } diff --git a/crates/stages/stages/src/stages/index_storage_history.rs b/crates/stages/stages/src/stages/index_storage_history.rs index 2ec4094c1e..49dd3b8fd0 100644 --- a/crates/stages/stages/src/stages/index_storage_history.rs +++ b/crates/stages/stages/src/stages/index_storage_history.rs @@ -7,7 +7,12 @@ use reth_db_api::{ tables, transaction::DbTxMut, }; -use reth_provider::{DBProvider, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter}; +#[cfg(all(unix, feature = "rocksdb"))] +use reth_provider::EitherWriter; +use reth_provider::{ + DBProvider, HistoryWriter, NodePrimitivesProvider, PruneCheckpointReader, + PruneCheckpointWriter, RocksDBProviderFactory, StorageSettingsCache, +}; use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment}; use reth_stages_api::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use std::fmt::Debug; @@ -46,8 +51,13 @@ impl Default for IndexStorageHistoryStage { impl Stage for IndexStorageHistoryStage where - Provider: - DBProvider + PruneCheckpointWriter + HistoryWriter + PruneCheckpointReader, + Provider: DBProvider + + PruneCheckpointWriter + + HistoryWriter + + PruneCheckpointReader + + StorageSettingsCache + + RocksDBProviderFactory + + NodePrimitivesProvider, { /// Return the id of the stage fn id(&self) -> StageId { @@ -115,17 +125,57 @@ where &self.etl_config, )?; - info!(target: "sync::stages::index_storage_history::exec", "Loading indices into database"); - load_history_indices::<_, tables::StoragesHistory, _>( - provider, - collector, - first_sync, - |AddressStorageKey((address, storage_key)), highest_block_number| { - StorageShardedKey::new(address, storage_key, highest_block_number) - }, - StorageShardedKey::decode_owned, - |key| AddressStorageKey((key.address, key.sharded_key.key)), - )?; + // Check if RocksDB is enabled for storage history + #[cfg(all(unix, feature = "rocksdb"))] + let use_rocksdb = provider.cached_storage_settings().storages_history_in_rocksdb; + #[cfg(not(all(unix, feature = "rocksdb")))] + let use_rocksdb = false; + + info!(target: "sync::stages::index_storage_history::exec", ?use_rocksdb, "Loading indices into database"); + + if use_rocksdb { + #[cfg(all(unix, feature = "rocksdb"))] + { + // Create RocksDB batch + let rocksdb = provider.rocksdb_provider(); + let mut rocksdb_batch = rocksdb.batch(); + + if first_sync { + super::clear_rocksdb_table::( + &rocksdb, + &mut rocksdb_batch, + )?; + } + + // Create writer that routes to RocksDB + let mut writer = EitherWriter::new_storages_history(provider, rocksdb_batch)?; + + // Load indices using the RocksDB path + super::load_storage_history_indices_via_writer( + &mut writer, + collector, + first_sync, + provider, + )?; + + // Extract and register RocksDB batch for commit at provider level + if let Some(batch) = writer.into_raw_rocksdb_batch() { + provider.set_pending_rocksdb_batch(batch); + } + } + } else { + // Keep existing MDBX path unchanged + load_history_indices::<_, tables::StoragesHistory, _>( + provider, + collector, + first_sync, + |AddressStorageKey((address, storage_key)), highest_block_number| { + StorageShardedKey::new(address, storage_key, highest_block_number) + }, + StorageShardedKey::decode_owned, + |key| AddressStorageKey((key.address, key.sharded_key.key)), + )?; + } Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: true }) } @@ -139,7 +189,35 @@ where let (range, unwind_progress, _) = input.unwind_block_range_with_threshold(self.commit_threshold); - provider.unwind_storage_history_indices_range(BlockNumberAddress::range(range))?; + // Check if RocksDB is enabled for storage history + #[cfg(all(unix, feature = "rocksdb"))] + let use_rocksdb = provider.cached_storage_settings().storages_history_in_rocksdb; + #[cfg(not(all(unix, feature = "rocksdb")))] + let use_rocksdb = false; + + if use_rocksdb { + #[cfg(all(unix, feature = "rocksdb"))] + { + use reth_db_api::{cursor::DbCursorRO, transaction::DbTx}; + use std::collections::BTreeSet; + + // Stream changesets directly into a set of affected (address, storage_key) pairs + let mut affected_keys = BTreeSet::new(); + for entry in provider + .tx_ref() + .cursor_read::()? + .walk_range(BlockNumberAddress::range(range.clone()))? + { + let (block_address, storage_entry) = entry?; + affected_keys.insert((block_address.address(), storage_entry.key)); + } + + // Unwind using RocksDB + super::unwind_storage_history_via_rocksdb(provider, affected_keys, *range.start())?; + } + } else { + provider.unwind_storage_history_indices_range(BlockNumberAddress::range(range))?; + } Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) }) } @@ -663,4 +741,456 @@ mod tests { Ok(()) } } + + #[cfg(all(unix, feature = "rocksdb"))] + mod rocksdb_tests { + use super::*; + use reth_provider::RocksDBProviderFactory; + use reth_storage_api::StorageSettings; + + #[tokio::test] + async fn execute_writes_storage_history_to_rocksdb_when_enabled() { + let db = TestStageDB::default(); + + // Enable RocksDB for storage history + db.factory.set_storage_settings_cache( + StorageSettings::legacy().with_storages_history_in_rocksdb(true), + ); + + // setup + partial_setup(&db); + + // run + run(&db, MAX_BLOCK, None); + + // Verify MDBX table is empty (data should be in RocksDB) + let mdbx_count = db.table::().unwrap().len(); + assert_eq!( + mdbx_count, 0, + "MDBX StoragesHistory should be empty when RocksDB is enabled" + ); + + // Verify RocksDB has the data + let rocksdb = db.factory.rocksdb_provider(); + let result = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(result.is_some(), "Storage history should exist in RocksDB"); + + // Verify the block numbers are correct + let list = result.unwrap(); + let blocks: Vec = list.iter().collect(); + assert!(!blocks.is_empty(), "Block list should not be empty"); + } + + #[tokio::test] + async fn first_sync_clears_stale_rocksdb_storage_history() { + let db = TestStageDB::default(); + + // Enable RocksDB for storage history + db.factory.set_storage_settings_cache( + StorageSettings::legacy().with_storages_history_in_rocksdb(true), + ); + + // Seed RocksDB with a stale entry for a different address/storage key + let rocksdb = db.factory.rocksdb_provider(); + let stale_address = address!("0x0000000000000000000000000000000000000002"); + let stale_storage_key = + b256!("0x0000000000000000000000000000000000000000000000000000000000000002"); + let stale_key = StorageShardedKey { + address: stale_address, + sharded_key: ShardedKey { key: stale_storage_key, highest_block_number: u64::MAX }, + }; + rocksdb.put::(stale_key.clone(), &list(&[999])).unwrap(); + + // setup + partial_setup(&db); + + // run (first sync) + run(&db, MAX_BLOCK, None); + + // Verify stale entry is removed + let rocksdb = db.factory.rocksdb_provider(); + let stale = rocksdb.get::(stale_key).unwrap(); + assert!( + stale.is_none(), + "Stale RocksDB storage history should be cleared on first sync" + ); + } + + #[tokio::test] + async fn execute_writes_to_mdbx_when_rocksdb_disabled() { + let db = TestStageDB::default(); + + // Ensure RocksDB is disabled for storage history (default) + db.factory.set_storage_settings_cache( + StorageSettings::legacy().with_storages_history_in_rocksdb(false), + ); + + // setup + partial_setup(&db); + + // run + run(&db, MAX_BLOCK, None); + + // Verify MDBX table has data + let mdbx_count = db.table::().unwrap().len(); + assert!( + mdbx_count > 0, + "MDBX StoragesHistory should have data when RocksDB is disabled" + ); + } + + /// Test incremental sync with RocksDB: run stage twice and verify indices are merged. + #[tokio::test] + async fn incremental_sync_merges_indices_in_rocksdb() { + let db = TestStageDB::default(); + + // Enable RocksDB for storage history + db.factory.set_storage_settings_cache( + StorageSettings::legacy().with_storages_history_in_rocksdb(true), + ); + + // First run: setup changesets for blocks 0-5 and index them + let first_run_end: u64 = 5; + db.commit(|tx| { + for block in 0..=first_run_end { + tx.put::( + block, + StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, + )?; + tx.put::( + block_number_address(block), + storage(STORAGE_KEY), + )?; + } + Ok(()) + }) + .unwrap(); + + // Run stage for first batch (first_sync = true since checkpoint is 0) + run(&db, first_run_end, None); + + // Verify first run wrote to RocksDB + let rocksdb = db.factory.rocksdb_provider(); + let first_result = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(first_result.is_some(), "First run should write to RocksDB"); + let first_blocks: Vec = first_result.unwrap().iter().collect(); + assert_eq!( + first_blocks.len(), + (first_run_end + 1) as usize, + "First run should have blocks 0-5" + ); + + // Second run: add changesets for blocks 6-10 + let second_run_end: u64 = 10; + db.commit(|tx| { + for block in (first_run_end + 1)..=second_run_end { + tx.put::( + block, + StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, + )?; + tx.put::( + block_number_address(block), + storage(STORAGE_KEY), + )?; + } + Ok(()) + }) + .unwrap(); + + // Run stage for second batch (first_sync = false since checkpoint > 0) + run(&db, second_run_end, Some(first_run_end)); + + // Verify second run merged indices in RocksDB + let rocksdb = db.factory.rocksdb_provider(); + let merged_result = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(merged_result.is_some(), "Second run should have data in RocksDB"); + let merged_blocks: Vec = merged_result.unwrap().iter().collect(); + + // Should contain all blocks from 0 to 10 + let expected_blocks: Vec = (0..=second_run_end).collect(); + assert_eq!( + merged_blocks, expected_blocks, + "Incremental sync should merge all indices: expected {:?}, got {:?}", + expected_blocks, merged_blocks + ); + + // Verify MDBX table is still empty + let mdbx_count = db.table::().unwrap().len(); + assert_eq!(mdbx_count, 0, "MDBX should remain empty when RocksDB is enabled"); + } + + /// Test unwind removes storage history from RocksDB when enabled. + #[tokio::test] + async fn unwind_removes_storage_history_from_rocksdb() { + let db = TestStageDB::default(); + + // Enable RocksDB for storage history + db.factory.set_storage_settings_cache( + StorageSettings::legacy().with_storages_history_in_rocksdb(true), + ); + + // setup changesets for blocks 0-10 + db.commit(|tx| { + for block in 0..=10u64 { + tx.put::( + block, + StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, + )?; + tx.put::( + block_number_address(block), + storage(STORAGE_KEY), + )?; + } + Ok(()) + }) + .unwrap(); + + // Run stage to index blocks 0-10 + run(&db, 10, None); + + // Verify RocksDB has all blocks + let rocksdb = db.factory.rocksdb_provider(); + let before_unwind = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(before_unwind.is_some(), "RocksDB should have data before unwind"); + let before_blocks: Vec = before_unwind.unwrap().iter().collect(); + assert_eq!(before_blocks, (0..=10).collect::>(), "Should have blocks 0-10"); + + // Unwind to block 5 + unwind(&db, 10, 5); + + // Verify RocksDB only contains blocks 0-5 (blocks <= unwind_to are kept) + let rocksdb = db.factory.rocksdb_provider(); + let after_unwind = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(after_unwind.is_some(), "RocksDB should still have data after partial unwind"); + let after_blocks: Vec = after_unwind.unwrap().iter().collect(); + assert_eq!( + after_blocks, + (0..=5).collect::>(), + "After unwind to 5, should have blocks 0-5" + ); + + // Verify MDBX table is still empty + let mdbx_count = db.table::().unwrap().len(); + assert_eq!(mdbx_count, 0, "MDBX should remain empty during RocksDB unwind"); + } + + /// Test unwind to genesis removes all storage history from RocksDB. + #[tokio::test] + async fn unwind_to_genesis_clears_rocksdb_storage_history() { + let db = TestStageDB::default(); + + // Enable RocksDB for storage history + db.factory.set_storage_settings_cache( + StorageSettings::legacy().with_storages_history_in_rocksdb(true), + ); + + // setup changesets for blocks 0-5 + db.commit(|tx| { + for block in 0..=5u64 { + tx.put::( + block, + StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, + )?; + tx.put::( + block_number_address(block), + storage(STORAGE_KEY), + )?; + } + Ok(()) + }) + .unwrap(); + + // Run stage + run(&db, 5, None); + + // Verify RocksDB has data + let rocksdb = db.factory.rocksdb_provider(); + let before = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(before.is_some(), "RocksDB should have data before unwind"); + + // Unwind to genesis (block 0) + unwind(&db, 5, 0); + + // Verify RocksDB still has block 0 (blocks <= unwind_to are kept) + let rocksdb = db.factory.rocksdb_provider(); + let after = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(after.is_some(), "RocksDB should still have block 0 after unwind to genesis"); + let after_blocks: Vec = after.unwrap().iter().collect(); + assert_eq!(after_blocks, vec![0], "After unwind to 0, should only have block 0"); + } + + /// Test unwind with no changesets in range is a no-op. + #[tokio::test] + async fn unwind_with_no_changesets_is_noop() { + let db = TestStageDB::default(); + + // Enable RocksDB for storage history + db.factory.set_storage_settings_cache( + StorageSettings::legacy().with_storages_history_in_rocksdb(true), + ); + + // Setup changesets only for blocks 0-5 (leave gap at 6-10) + db.commit(|tx| { + for block in 0..=5u64 { + tx.put::( + block, + StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, + )?; + tx.put::( + block_number_address(block), + storage(STORAGE_KEY), + )?; + } + // Add block body indices for 6-10 but NO changesets + for block in 6..=10u64 { + tx.put::( + block, + StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, + )?; + } + Ok(()) + }) + .unwrap(); + + // Run stage to index blocks 0-10 + run(&db, 10, None); + + // Record state before unwind + let rocksdb = db.factory.rocksdb_provider(); + let before = rocksdb.get::(shard(u64::MAX)).unwrap(); + let before_blocks: Vec = before.unwrap().iter().collect(); + + // Unwind from 10 to 7 (range 8-10 has no changesets) + unwind(&db, 10, 7); + + // Verify RocksDB data is unchanged (no changesets in unwind range) + let rocksdb = db.factory.rocksdb_provider(); + let after = rocksdb.get::(shard(u64::MAX)).unwrap(); + let after_blocks: Vec = after.unwrap().iter().collect(); + assert_eq!( + before_blocks, after_blocks, + "Data should be unchanged when no changesets in unwind range" + ); + } + + /// Test unwind preserves unrelated storage keys. + #[tokio::test] + async fn unwind_preserves_unrelated_storage_keys() { + let db = TestStageDB::default(); + + // Enable RocksDB for storage history + db.factory.set_storage_settings_cache( + StorageSettings::legacy().with_storages_history_in_rocksdb(true), + ); + + let key_1 = B256::with_last_byte(1); + let key_2 = B256::with_last_byte(2); + + // Setup: key_1 has changes in blocks 0-10, key_2 only in blocks 0-5 + db.commit(|tx| { + for block in 0..=10u64 { + tx.put::( + block, + StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, + )?; + // key_1 changes in all blocks + tx.put::( + block_number_address(block), + storage(key_1), + )?; + // key_2 only changes in blocks 0-5 + if block <= 5 { + tx.put::( + block_number_address(block), + storage(key_2), + )?; + } + } + Ok(()) + }) + .unwrap(); + + // Run stage + run(&db, 10, None); + + // Record key_2 state before unwind + let rocksdb = db.factory.rocksdb_provider(); + let shard_key_2 = StorageShardedKey { + address: ADDRESS, + sharded_key: ShardedKey { key: key_2, highest_block_number: u64::MAX }, + }; + let before_2 = rocksdb.get::(shard_key_2.clone()).unwrap(); + let before_2_blocks: Vec = before_2.unwrap().iter().collect(); + + // Unwind from 10 to 7 (only key_1 has changes in 8-10) + unwind(&db, 10, 7); + + // Verify key_2 is unchanged (no changes in unwind range) + let rocksdb = db.factory.rocksdb_provider(); + let after_2 = rocksdb.get::(shard_key_2).unwrap(); + let after_2_blocks: Vec = after_2.unwrap().iter().collect(); + assert_eq!( + before_2_blocks, after_2_blocks, + "Key 2 should be unchanged when it has no changes in unwind range" + ); + + // Verify key_1 was properly unwound + let shard_key_1 = StorageShardedKey { + address: ADDRESS, + sharded_key: ShardedKey { key: key_1, highest_block_number: u64::MAX }, + }; + let after_1 = rocksdb.get::(shard_key_1).unwrap(); + let after_1_blocks: Vec = after_1.unwrap().iter().collect(); + assert_eq!( + after_1_blocks, + (0..=7).collect::>(), + "Key 1 should have blocks 0-7 after unwind" + ); + } + + /// Test unwind deletes entry when all blocks are removed. + #[tokio::test] + async fn unwind_deletes_entry_when_all_blocks_removed() { + let db = TestStageDB::default(); + + // Enable RocksDB for storage history + db.factory.set_storage_settings_cache( + StorageSettings::legacy().with_storages_history_in_rocksdb(true), + ); + + // Setup: only add changesets for blocks 5-10 + db.commit(|tx| { + for block in 0..=10u64 { + tx.put::( + block, + StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, + )?; + if block >= 5 { + tx.put::( + block_number_address(block), + storage(STORAGE_KEY), + )?; + } + } + Ok(()) + }) + .unwrap(); + + // Run stage + run(&db, 10, None); + + // Verify RocksDB has data + let rocksdb = db.factory.rocksdb_provider(); + let before = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(before.is_some(), "RocksDB should have data before unwind"); + + // Unwind to block 4 (removes blocks 5-10, which is ALL the data) + unwind(&db, 10, 4); + + // Verify entry is deleted (no blocks left) + let rocksdb = db.factory.rocksdb_provider(); + let after = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(after.is_none(), "Entry should be deleted when all blocks are removed"); + } + } } diff --git a/crates/stages/stages/src/stages/utils.rs b/crates/stages/stages/src/stages/utils.rs index f4bb960e7a..938f77ba6f 100644 --- a/crates/stages/stages/src/stages/utils.rs +++ b/crates/stages/stages/src/stages/utils.rs @@ -9,6 +9,10 @@ use reth_db_api::{ BlockNumberList, DatabaseError, }; use reth_etl::Collector; +#[cfg(all(unix, feature = "rocksdb"))] +use reth_provider::providers::{RocksDBBatch, RocksDBProvider}; +#[cfg(all(unix, feature = "rocksdb"))] +use reth_provider::EitherWriter; use reth_provider::{ providers::StaticFileProvider, BlockReader, DBProvider, ProviderError, StaticFileProviderFactory, @@ -247,6 +251,331 @@ impl LoadMode { } } +/// Deletes all entries in a `RocksDB` table by enqueuing delete operations in the provided batch. +#[cfg(all(unix, feature = "rocksdb"))] +pub(crate) fn clear_rocksdb_table( + rocksdb: &RocksDBProvider, + batch: &mut RocksDBBatch<'_>, +) -> Result<(), StageError> { + for entry in rocksdb.iter::()? { + let (key, _value) = entry?; + batch.delete::(key)?; + } + Ok(()) +} + +/// Load storage history indices through an [`EitherWriter`] which routes writes to either +/// database or `RocksDB`. +/// +/// This is similar to [`load_history_indices`] but uses `EitherWriter` for flexible storage +/// backend routing. The sharding logic is identical: indices are grouped by partial keys, +/// shards are created when reaching `NUM_OF_INDICES_IN_SHARD`, and the last shard uses +/// `u64::MAX` as the highest block number sentinel. +/// +/// Note: For `RocksDB`, when `append_only == false`, this function reads existing shards +/// from `RocksDB` and merges them with new indices, matching the MDBX behavior. +#[cfg(all(unix, feature = "rocksdb"))] +pub(crate) fn load_storage_history_indices_via_writer( + writer: &mut EitherWriter<'_, CURSOR, N>, + mut collector: reth_etl::Collector< + reth_db_api::models::storage_sharded_key::StorageShardedKey, + BlockNumberList, + >, + append_only: bool, + rocksdb_provider: &Provider, +) -> Result<(), StageError> +where + CURSOR: DbCursorRW + + DbCursorRO, + N: reth_primitives_traits::NodePrimitives, + Provider: reth_provider::RocksDBProviderFactory, +{ + use reth_db_api::{models::storage_sharded_key::StorageShardedKey, table::Decode}; + + type PartialKey = (alloy_primitives::Address, alloy_primitives::B256); + + let mut current_partial = PartialKey::default(); + let mut current_list = Vec::::new(); + + let total_entries = collector.len(); + let interval = (total_entries / 10).max(1); + + for (index, element) in collector.iter()?.enumerate() { + let (k, v) = element?; + let sharded_key = StorageShardedKey::decode_owned(k)?; + let new_list = BlockNumberList::decompress_owned(v)?; + + if index > 0 && index.is_multiple_of(interval) && total_entries > 10 { + info!(target: "sync::stages::index_storage_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices via EitherWriter"); + } + + let partial_key: PartialKey = (sharded_key.address, sharded_key.sharded_key.key); + + if current_partial != partial_key { + flush_storage_shards(writer, current_partial, &mut current_list, append_only, true)?; + current_partial = partial_key; + current_list.clear(); + + // If it's not the first sync, there might be an existing shard already in RocksDB, + // so we need to merge it with the one coming from the collector + if !append_only { + let rocksdb = rocksdb_provider.rocksdb_provider(); + let key = StorageShardedKey::new(partial_key.0, partial_key.1, u64::MAX); + if let Some(existing_list) = + rocksdb.get::(key)? + { + current_list.extend(existing_list.iter()); + } + } + } + + current_list.extend(new_list.iter()); + flush_storage_shards(writer, current_partial, &mut current_list, append_only, false)?; + } + + flush_storage_shards(writer, current_partial, &mut current_list, append_only, true)?; + + Ok(()) +} + +#[cfg(all(unix, feature = "rocksdb"))] +fn flush_storage_shards( + writer: &mut EitherWriter<'_, CURSOR, N>, + partial_key: (alloy_primitives::Address, alloy_primitives::B256), + list: &mut Vec, + append_only: bool, + flush: bool, +) -> Result<(), StageError> +where + CURSOR: DbCursorRW + + DbCursorRO, + N: reth_primitives_traits::NodePrimitives, +{ + use reth_db_api::models::storage_sharded_key::StorageShardedKey; + + if list.len() > NUM_OF_INDICES_IN_SHARD || flush { + let chunks = + list.chunks(NUM_OF_INDICES_IN_SHARD).map(|c| c.to_vec()).collect::>>(); + + let mut iter = chunks.into_iter().peekable(); + while let Some(chunk) = iter.next() { + let mut highest = *chunk.last().expect("at least one index"); + + if !flush && iter.peek().is_none() { + *list = chunk; + } else { + if iter.peek().is_none() { + highest = u64::MAX; + } + let key = StorageShardedKey::new(partial_key.0, partial_key.1, highest); + let value = BlockNumberList::new_pre_sorted(chunk); + writer.put_storage_history(key, &value, append_only)?; + } + } + } + + Ok(()) +} + +/// Load account history indices through an [`EitherWriter`] which routes writes to either +/// database or `RocksDB`. +/// +/// Note: For `RocksDB`, when `append_only == false`, this function reads existing shards +/// from `RocksDB` and merges them with new indices, matching the MDBX behavior. +#[cfg(all(unix, feature = "rocksdb"))] +pub(crate) fn load_account_history_indices_via_writer( + writer: &mut EitherWriter<'_, CURSOR, N>, + mut collector: reth_etl::Collector< + reth_db_api::models::ShardedKey, + BlockNumberList, + >, + append_only: bool, + rocksdb_provider: &Provider, +) -> Result<(), StageError> +where + CURSOR: DbCursorRW + + DbCursorRO, + N: reth_primitives_traits::NodePrimitives, + Provider: reth_provider::RocksDBProviderFactory, +{ + use reth_db_api::{models::ShardedKey, table::Decode}; + + let mut current_partial = alloy_primitives::Address::default(); + let mut current_list = Vec::::new(); + + let total_entries = collector.len(); + let interval = (total_entries / 10).max(1); + + for (index, element) in collector.iter()?.enumerate() { + let (k, v) = element?; + let sharded_key = ShardedKey::::decode_owned(k)?; + let new_list = BlockNumberList::decompress_owned(v)?; + + if index > 0 && index.is_multiple_of(interval) && total_entries > 10 { + info!(target: "sync::stages::index_account_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices via EitherWriter"); + } + + let partial_key = sharded_key.key; + + if current_partial != partial_key { + flush_account_shards(writer, current_partial, &mut current_list, append_only, true)?; + current_partial = partial_key; + current_list.clear(); + + // If it's not the first sync, there might be an existing shard already in RocksDB, + // so we need to merge it with the one coming from the collector + if !append_only { + let rocksdb = rocksdb_provider.rocksdb_provider(); + let key = ShardedKey::new(partial_key, u64::MAX); + if let Some(existing_list) = + rocksdb.get::(key)? + { + current_list.extend(existing_list.iter()); + } + } + } + + current_list.extend(new_list.iter()); + flush_account_shards(writer, current_partial, &mut current_list, append_only, false)?; + } + + flush_account_shards(writer, current_partial, &mut current_list, append_only, true)?; + + Ok(()) +} + +#[cfg(all(unix, feature = "rocksdb"))] +fn flush_account_shards( + writer: &mut EitherWriter<'_, CURSOR, N>, + partial_key: alloy_primitives::Address, + list: &mut Vec, + append_only: bool, + flush: bool, +) -> Result<(), StageError> +where + CURSOR: DbCursorRW + + DbCursorRO, + N: reth_primitives_traits::NodePrimitives, +{ + use reth_db_api::models::ShardedKey; + + if list.len() > NUM_OF_INDICES_IN_SHARD || flush { + let chunks = + list.chunks(NUM_OF_INDICES_IN_SHARD).map(|c| c.to_vec()).collect::>>(); + + let mut iter = chunks.into_iter().peekable(); + while let Some(chunk) = iter.next() { + let mut highest = *chunk.last().expect("at least one index"); + + if !flush && iter.peek().is_none() { + *list = chunk; + } else { + if iter.peek().is_none() { + highest = u64::MAX; + } + let key = ShardedKey::new(partial_key, highest); + let value = BlockNumberList::new_pre_sorted(chunk); + writer.put_account_history(key, &value, append_only)?; + } + } + } + + Ok(()) +} + +/// Generic helper to unwind history indices using `RocksDB`. +/// +/// For each affected partial key: +/// 1. Reads the existing shard from `RocksDB` (the one with `u64::MAX` sentinel) +/// 2. Filters out block numbers >= `first_block_to_remove` +/// 3. If the result is empty, deletes the entry +/// 4. Otherwise, writes back the truncated shard +/// +/// Note: This only handles the sentinel shard (`u64::MAX`). This is correct for typical +/// unwind operations where recently-added blocks are always in the sentinel shard. +#[cfg(all(unix, feature = "rocksdb"))] +fn unwind_history_via_rocksdb_generic( + provider: &Provider, + affected_keys: impl IntoIterator, + first_block_to_remove: BlockNumber, + mk_sentinel_key: impl Fn(PK) -> K, +) -> Result +where + Provider: DBProvider + reth_provider::RocksDBProviderFactory, + T: reth_db_api::table::Table, + K: Clone, +{ + let rocksdb = provider.rocksdb_provider(); + let mut batch = rocksdb.batch(); + let mut count = 0; + + for pk in affected_keys { + let key = mk_sentinel_key(pk); + if let Some(existing_list) = rocksdb.get::(key.clone())? { + // Keep blocks < first_block_to_remove (matches MDBX semantics) + let filtered: Vec = + existing_list.iter().filter(|&block| block < first_block_to_remove).collect(); + + if filtered.is_empty() { + batch.delete::(key)?; + } else { + let new_list = BlockNumberList::new_pre_sorted(filtered); + batch.put::(key, &new_list)?; + } + count += 1; + } + } + + provider.set_pending_rocksdb_batch(batch.into_inner()); + Ok(count) +} + +/// Unwind storage history indices using `RocksDB`. +/// +/// Takes a set of affected (address, `storage_key`) pairs and removes all block numbers +/// >= `first_block_to_remove` from the history indices. +#[cfg(all(unix, feature = "rocksdb"))] +pub(crate) fn unwind_storage_history_via_rocksdb( + provider: &Provider, + affected_keys: std::collections::BTreeSet<(alloy_primitives::Address, alloy_primitives::B256)>, + first_block_to_remove: BlockNumber, +) -> Result +where + Provider: DBProvider + reth_provider::RocksDBProviderFactory, +{ + use reth_db_api::models::storage_sharded_key::StorageShardedKey; + + unwind_history_via_rocksdb_generic::<_, reth_db_api::tables::StoragesHistory, _, _>( + provider, + affected_keys, + first_block_to_remove, + |(address, storage_key)| StorageShardedKey::new(address, storage_key, u64::MAX), + ) +} + +/// Unwind account history indices using `RocksDB`. +/// +/// Takes a set of affected addresses and removes all block numbers +/// >= `first_block_to_remove` from the history indices. +#[cfg(all(unix, feature = "rocksdb"))] +pub(crate) fn unwind_account_history_via_rocksdb( + provider: &Provider, + affected_addresses: std::collections::BTreeSet, + first_block_to_remove: BlockNumber, +) -> Result +where + Provider: DBProvider + reth_provider::RocksDBProviderFactory, +{ + use reth_db_api::models::ShardedKey; + + unwind_history_via_rocksdb_generic::<_, reth_db_api::tables::AccountsHistory, _, _>( + provider, + affected_addresses, + first_block_to_remove, + |address| ShardedKey::new(address, u64::MAX), + ) +} + /// Called when database is ahead of static files. Attempts to find the first block we are missing /// transactions for. pub(crate) fn missing_static_data_error( diff --git a/crates/storage/provider/src/either_writer.rs b/crates/storage/provider/src/either_writer.rs index 80ca829e6e..a37f04156a 100644 --- a/crates/storage/provider/src/either_writer.rs +++ b/crates/storage/provider/src/either_writer.rs @@ -364,13 +364,24 @@ where CURSOR: DbCursorRW + DbCursorRO, { /// Puts a storage history entry. + /// + /// When `append_only` is true, uses `cursor.append()` which is significantly faster + /// but requires entries to be inserted in order and the table to be empty. + /// When false, uses `cursor.upsert()` which handles arbitrary insertion order. pub fn put_storage_history( &mut self, key: StorageShardedKey, value: &BlockNumberList, + append_only: bool, ) -> ProviderResult<()> { match self { - Self::Database(cursor) => Ok(cursor.upsert(key, value)?), + Self::Database(cursor) => { + if append_only { + Ok(cursor.append(key, value)?) + } else { + Ok(cursor.upsert(key, value)?) + } + } Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider), #[cfg(all(unix, feature = "rocksdb"))] Self::RocksDB(batch) => batch.put::(key, value), @@ -398,13 +409,24 @@ where CURSOR: DbCursorRW + DbCursorRO, { /// Puts an account history entry. + /// + /// When `append_only` is true, uses `cursor.append()` which is significantly faster + /// but requires entries to be inserted in order and the table to be empty. + /// When false, uses `cursor.upsert()` which handles arbitrary insertion order. pub fn put_account_history( &mut self, key: ShardedKey
, value: &BlockNumberList, + append_only: bool, ) -> ProviderResult<()> { match self { - Self::Database(cursor) => Ok(cursor.upsert(key, value)?), + Self::Database(cursor) => { + if append_only { + Ok(cursor.append(key, value)?) + } else { + Ok(cursor.upsert(key, value)?) + } + } Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider), #[cfg(all(unix, feature = "rocksdb"))] Self::RocksDB(batch) => batch.put::(key, value),