From 624ddc57799c1c1bf06e0d6af3e945b57f5d3e32 Mon Sep 17 00:00:00 2001 From: YK Date: Wed, 21 Jan 2026 18:05:19 +0100 Subject: [PATCH] feat(stages): add RocksDB support for IndexStorageHistoryStage (#21175) --- .../src/stages/index_storage_history.rs | 356 +++++++++++++++-- crates/stages/stages/src/stages/utils.rs | 360 ++++++++++-------- crates/storage/provider/src/either_writer.rs | 45 ++- .../src/providers/database/provider.rs | 44 ++- .../src/providers/rocksdb/provider.rs | 173 +++++++++ 5 files changed, 770 insertions(+), 208 deletions(-) diff --git a/crates/stages/stages/src/stages/index_storage_history.rs b/crates/stages/stages/src/stages/index_storage_history.rs index 2ec4094c1e..e37dbaa441 100644 --- a/crates/stages/stages/src/stages/index_storage_history.rs +++ b/crates/stages/stages/src/stages/index_storage_history.rs @@ -1,19 +1,21 @@ -use super::{collect_history_indices, load_history_indices}; -use crate::{StageCheckpoint, StageId}; +use super::collect_history_indices; +use crate::{stages::utils::load_storage_history, StageCheckpoint, StageId}; use reth_config::config::{EtlConfig, IndexHistoryConfig}; use reth_db_api::{ models::{storage_sharded_key::StorageShardedKey, AddressStorageKey, BlockNumberAddress}, - table::Decode, tables, transaction::DbTxMut, }; -use reth_provider::{DBProvider, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter}; +use reth_provider::{ + DBProvider, EitherWriter, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter, + RocksDBProviderFactory, StorageSettingsCache, +}; use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment}; use reth_stages_api::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use std::fmt::Debug; use tracing::info; -/// Stage is indexing history the account changesets generated in +/// Stage is indexing history the storage changesets generated in /// [`ExecutionStage`][crate::stages::ExecutionStage]. For more information /// on index sharding take a look at [`tables::StoragesHistory`]. #[derive(Debug)] @@ -34,7 +36,7 @@ impl IndexStorageHistoryStage { etl_config: EtlConfig, prune_mode: Option, ) -> Self { - Self { commit_threshold: config.commit_threshold, prune_mode, etl_config } + Self { commit_threshold: config.commit_threshold, etl_config, prune_mode } } } @@ -46,8 +48,13 @@ impl Default for IndexStorageHistoryStage { impl Stage for IndexStorageHistoryStage where - Provider: - DBProvider + PruneCheckpointWriter + HistoryWriter + PruneCheckpointReader, + Provider: DBProvider + + HistoryWriter + + PruneCheckpointReader + + PruneCheckpointWriter + + StorageSettingsCache + + RocksDBProviderFactory + + reth_provider::NodePrimitivesProvider, { /// Return the id of the stage fn id(&self) -> StageId { @@ -95,15 +102,25 @@ where let mut range = input.next_block_range(); let first_sync = input.checkpoint().block_number == 0; + let use_rocksdb = provider.cached_storage_settings().storages_history_in_rocksdb; // On first sync we might have history coming from genesis. We clear the table since it's // faster to rebuild from scratch. if first_sync { - provider.tx_ref().clear::()?; + if use_rocksdb { + // Note: RocksDB clear() executes immediately (not deferred to commit like MDBX), + // but this is safe for first_sync because if we crash before commit, the + // checkpoint stays at 0 and we'll just clear and rebuild again on restart. The + // source data (changesets) is intact. + #[cfg(all(unix, feature = "rocksdb"))] + provider.rocksdb_provider().clear::()?; + } else { + provider.tx_ref().clear::()?; + } range = 0..=*input.next_block_range().end(); } - info!(target: "sync::stages::index_storage_history::exec", ?first_sync, "Collecting indices"); + info!(target: "sync::stages::index_storage_history::exec", ?first_sync, ?use_rocksdb, "Collecting indices"); let collector = collect_history_indices::<_, tables::StorageChangeSets, tables::StoragesHistory, _>( provider, @@ -116,16 +133,13 @@ where )?; info!(target: "sync::stages::index_storage_history::exec", "Loading indices into database"); - load_history_indices::<_, tables::StoragesHistory, _>( - provider, - collector, - first_sync, - |AddressStorageKey((address, storage_key)), highest_block_number| { - StorageShardedKey::new(address, storage_key, highest_block_number) - }, - StorageShardedKey::decode_owned, - |key| AddressStorageKey((key.address, key.sharded_key.key)), - )?; + + provider.with_rocksdb_batch(|rocksdb_batch| { + let mut writer = EitherWriter::new_storages_history(provider, rocksdb_batch)?; + load_storage_history(collector, first_sync, &mut writer) + .map_err(|e| reth_provider::ProviderError::other(Box::new(e)))?; + Ok(((), writer.into_raw_rocksdb_batch())) + })?; Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: true }) } @@ -382,12 +396,12 @@ mod tests { async fn insert_index_second_half_shard() { // init let db = TestStageDB::default(); - let mut close_full_list = (1..=LAST_BLOCK_IN_FULL_SHARD - 1).collect::>(); + let mut almost_full_list = (1..=LAST_BLOCK_IN_FULL_SHARD - 1).collect::>(); // setup partial_setup(&db); db.commit(|tx| { - tx.put::(shard(u64::MAX), list(&close_full_list)).unwrap(); + tx.put::(shard(u64::MAX), list(&almost_full_list)).unwrap(); Ok(()) }) .unwrap(); @@ -396,12 +410,12 @@ mod tests { run(&db, LAST_BLOCK_IN_FULL_SHARD + 1, Some(LAST_BLOCK_IN_FULL_SHARD - 1)); // verify - close_full_list.push(LAST_BLOCK_IN_FULL_SHARD); + almost_full_list.push(LAST_BLOCK_IN_FULL_SHARD); let table = cast(db.table::().unwrap()); assert_eq!( table, BTreeMap::from([ - (shard(LAST_BLOCK_IN_FULL_SHARD), close_full_list.clone()), + (shard(LAST_BLOCK_IN_FULL_SHARD), almost_full_list.clone()), (shard(u64::MAX), vec![LAST_BLOCK_IN_FULL_SHARD + 1]) ]) ); @@ -410,9 +424,9 @@ mod tests { unwind(&db, LAST_BLOCK_IN_FULL_SHARD, LAST_BLOCK_IN_FULL_SHARD - 1); // verify initial state - close_full_list.pop(); + almost_full_list.pop(); let table = cast(db.table::().unwrap()); - assert_eq!(table, BTreeMap::from([(shard(u64::MAX), close_full_list)])); + assert_eq!(table, BTreeMap::from([(shard(u64::MAX), almost_full_list)])); } #[tokio::test] @@ -663,4 +677,294 @@ mod tests { Ok(()) } } + + #[cfg(all(unix, feature = "rocksdb"))] + mod rocksdb_tests { + use super::*; + use reth_provider::RocksDBProviderFactory; + use reth_storage_api::StorageSettings; + + /// Test that when `storages_history_in_rocksdb` is enabled, the stage + /// writes storage history indices to `RocksDB` instead of MDBX. + #[tokio::test] + async fn execute_writes_to_rocksdb_when_enabled() { + let db = TestStageDB::default(); + + db.factory.set_storage_settings_cache( + StorageSettings::legacy().with_storages_history_in_rocksdb(true), + ); + + db.commit(|tx| { + for block in 0..=10 { + tx.put::( + block, + StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, + )?; + tx.put::( + block_number_address(block), + storage(STORAGE_KEY), + )?; + } + Ok(()) + }) + .unwrap(); + + let input = ExecInput { target: Some(10), ..Default::default() }; + let mut stage = IndexStorageHistoryStage::default(); + let provider = db.factory.database_provider_rw().unwrap(); + let out = stage.execute(&provider, input).unwrap(); + assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }); + provider.commit().unwrap(); + + let mdbx_table = db.table::().unwrap(); + assert!( + mdbx_table.is_empty(), + "MDBX StoragesHistory should be empty when RocksDB is enabled" + ); + + let rocksdb = db.factory.rocksdb_provider(); + let result = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(result.is_some(), "RocksDB should contain storage history"); + + let block_list = result.unwrap(); + let blocks: Vec = block_list.iter().collect(); + assert_eq!(blocks, (0..=10).collect::>()); + } + + /// Test that unwind works correctly when `storages_history_in_rocksdb` is enabled. + #[tokio::test] + async fn unwind_works_when_rocksdb_enabled() { + let db = TestStageDB::default(); + + db.factory.set_storage_settings_cache( + StorageSettings::legacy().with_storages_history_in_rocksdb(true), + ); + + db.commit(|tx| { + for block in 0..=10 { + tx.put::( + block, + StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, + )?; + tx.put::( + block_number_address(block), + storage(STORAGE_KEY), + )?; + } + Ok(()) + }) + .unwrap(); + + let input = ExecInput { target: Some(10), ..Default::default() }; + let mut stage = IndexStorageHistoryStage::default(); + let provider = db.factory.database_provider_rw().unwrap(); + let out = stage.execute(&provider, input).unwrap(); + assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }); + provider.commit().unwrap(); + + let rocksdb = db.factory.rocksdb_provider(); + let result = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(result.is_some(), "RocksDB should have data before unwind"); + let blocks_before: Vec = result.unwrap().iter().collect(); + assert_eq!(blocks_before, (0..=10).collect::>()); + + let unwind_input = + UnwindInput { checkpoint: StageCheckpoint::new(10), unwind_to: 5, bad_block: None }; + let provider = db.factory.database_provider_rw().unwrap(); + let out = stage.unwind(&provider, unwind_input).unwrap(); + assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(5) }); + provider.commit().unwrap(); + + let rocksdb = db.factory.rocksdb_provider(); + let result = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(result.is_some(), "RocksDB should still have data after partial unwind"); + let blocks_after: Vec = result.unwrap().iter().collect(); + assert_eq!( + blocks_after, + (0..=5).collect::>(), + "Should only have blocks 0-5 after unwind to block 5" + ); + } + + /// Test that unwind to block 0 keeps only block 0's history. + #[tokio::test] + async fn unwind_to_zero_keeps_block_zero() { + let db = TestStageDB::default(); + + db.factory.set_storage_settings_cache( + StorageSettings::legacy().with_storages_history_in_rocksdb(true), + ); + + db.commit(|tx| { + for block in 0..=5 { + tx.put::( + block, + StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, + )?; + tx.put::( + block_number_address(block), + storage(STORAGE_KEY), + )?; + } + Ok(()) + }) + .unwrap(); + + let input = ExecInput { target: Some(5), ..Default::default() }; + let mut stage = IndexStorageHistoryStage::default(); + let provider = db.factory.database_provider_rw().unwrap(); + let out = stage.execute(&provider, input).unwrap(); + assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(5), done: true }); + provider.commit().unwrap(); + + let rocksdb = db.factory.rocksdb_provider(); + let result = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(result.is_some(), "RocksDB should have data before unwind"); + + let unwind_input = + UnwindInput { checkpoint: StageCheckpoint::new(5), unwind_to: 0, bad_block: None }; + let provider = db.factory.database_provider_rw().unwrap(); + let out = stage.unwind(&provider, unwind_input).unwrap(); + assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(0) }); + provider.commit().unwrap(); + + let rocksdb = db.factory.rocksdb_provider(); + let result = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(result.is_some(), "RocksDB should still have block 0 history"); + let blocks_after: Vec = result.unwrap().iter().collect(); + assert_eq!(blocks_after, vec![0], "Should only have block 0 after unwinding to 0"); + } + + /// Test incremental sync merges new data with existing shards. + #[tokio::test] + async fn execute_incremental_sync() { + let db = TestStageDB::default(); + + db.factory.set_storage_settings_cache( + StorageSettings::legacy().with_storages_history_in_rocksdb(true), + ); + + db.commit(|tx| { + for block in 0..=5 { + tx.put::( + block, + StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, + )?; + tx.put::( + block_number_address(block), + storage(STORAGE_KEY), + )?; + } + Ok(()) + }) + .unwrap(); + + let input = ExecInput { target: Some(5), ..Default::default() }; + let mut stage = IndexStorageHistoryStage::default(); + let provider = db.factory.database_provider_rw().unwrap(); + let out = stage.execute(&provider, input).unwrap(); + assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(5), done: true }); + provider.commit().unwrap(); + + let rocksdb = db.factory.rocksdb_provider(); + let result = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(result.is_some()); + let blocks: Vec = result.unwrap().iter().collect(); + assert_eq!(blocks, (0..=5).collect::>()); + + db.commit(|tx| { + for block in 6..=10 { + tx.put::( + block, + StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, + )?; + tx.put::( + block_number_address(block), + storage(STORAGE_KEY), + )?; + } + Ok(()) + }) + .unwrap(); + + let input = ExecInput { target: Some(10), checkpoint: Some(StageCheckpoint::new(5)) }; + let provider = db.factory.database_provider_rw().unwrap(); + let out = stage.execute(&provider, input).unwrap(); + assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }); + provider.commit().unwrap(); + + let rocksdb = db.factory.rocksdb_provider(); + let result = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(result.is_some(), "RocksDB should have merged data"); + let blocks: Vec = result.unwrap().iter().collect(); + assert_eq!(blocks, (0..=10).collect::>()); + } + + /// Test multi-shard unwind correctly handles shards that span across unwind boundary. + #[tokio::test] + async fn unwind_multi_shard() { + use reth_db_api::models::sharded_key::NUM_OF_INDICES_IN_SHARD; + + let db = TestStageDB::default(); + + db.factory.set_storage_settings_cache( + StorageSettings::legacy().with_storages_history_in_rocksdb(true), + ); + + let num_blocks = (NUM_OF_INDICES_IN_SHARD * 2 + 100) as u64; + + db.commit(|tx| { + for block in 0..num_blocks { + tx.put::( + block, + StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, + )?; + tx.put::( + block_number_address(block), + storage(STORAGE_KEY), + )?; + } + Ok(()) + }) + .unwrap(); + + let input = ExecInput { target: Some(num_blocks - 1), ..Default::default() }; + let mut stage = IndexStorageHistoryStage::default(); + let provider = db.factory.database_provider_rw().unwrap(); + let out = stage.execute(&provider, input).unwrap(); + assert_eq!( + out, + ExecOutput { checkpoint: StageCheckpoint::new(num_blocks - 1), done: true } + ); + provider.commit().unwrap(); + + let rocksdb = db.factory.rocksdb_provider(); + let shards = rocksdb.storage_history_shards(ADDRESS, STORAGE_KEY).unwrap(); + assert!(shards.len() >= 2, "Should have at least 2 shards for {} blocks", num_blocks); + + let unwind_to = NUM_OF_INDICES_IN_SHARD as u64 + 50; + let unwind_input = UnwindInput { + checkpoint: StageCheckpoint::new(num_blocks - 1), + unwind_to, + bad_block: None, + }; + let provider = db.factory.database_provider_rw().unwrap(); + let out = stage.unwind(&provider, unwind_input).unwrap(); + assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(unwind_to) }); + provider.commit().unwrap(); + + let rocksdb = db.factory.rocksdb_provider(); + let shards_after = rocksdb.storage_history_shards(ADDRESS, STORAGE_KEY).unwrap(); + assert!(!shards_after.is_empty(), "Should still have shards after unwind"); + + let all_blocks: Vec = + shards_after.iter().flat_map(|(_, list)| list.iter()).collect(); + assert_eq!( + all_blocks, + (0..=unwind_to).collect::>(), + "Should only have blocks 0 to {} after unwind", + unwind_to + ); + } + } } diff --git a/crates/stages/stages/src/stages/utils.rs b/crates/stages/stages/src/stages/utils.rs index 93158a62ed..c5a8dee347 100644 --- a/crates/stages/stages/src/stages/utils.rs +++ b/crates/stages/stages/src/stages/utils.rs @@ -1,12 +1,15 @@ //! Utils for `stages`. -use alloy_primitives::{Address, BlockNumber, TxNumber}; +use alloy_primitives::{Address, BlockNumber, TxNumber, B256}; use reth_config::config::EtlConfig; use reth_db_api::{ cursor::{DbCursorRO, DbCursorRW}, - models::{sharded_key::NUM_OF_INDICES_IN_SHARD, AccountBeforeTx, ShardedKey}, + models::{ + sharded_key::NUM_OF_INDICES_IN_SHARD, storage_sharded_key::StorageShardedKey, + AccountBeforeTx, ShardedKey, + }, table::{Decode, Decompress, Table}, - transaction::{DbTx, DbTxMut}, - BlockNumberList, DatabaseError, + transaction::DbTx, + BlockNumberList, }; use reth_etl::Collector; use reth_primitives_traits::NodePrimitives; @@ -171,164 +174,9 @@ where Ok(collector) } -/// Given a [`Collector`] created by [`collect_history_indices`] it iterates all entries, loading -/// the indices into the database in shards. -/// -/// ## Process -/// Iterates over elements, grouping indices by their partial keys (e.g., `Address` or -/// `Address.StorageKey`). It flushes indices to disk when reaching a shard's max length -/// (`NUM_OF_INDICES_IN_SHARD`) or when the partial key changes, ensuring the last previous partial -/// key shard is stored. -pub(crate) fn load_history_indices( - provider: &Provider, - mut collector: Collector, - append_only: bool, - sharded_key_factory: impl Clone + Fn(P, u64) -> ::Key, - decode_key: impl Fn(Vec) -> Result<::Key, DatabaseError>, - get_partial: impl Fn(::Key) -> P, -) -> Result<(), StageError> -where - Provider: DBProvider, - H: Table, - P: Copy + Default + Eq, -{ - let mut write_cursor = provider.tx_ref().cursor_write::()?; - let mut current_partial = None; - let mut current_list = Vec::::new(); - - // observability - let total_entries = collector.len(); - let interval = (total_entries / 10).max(1); - - for (index, element) in collector.iter()?.enumerate() { - let (k, v) = element?; - let sharded_key = decode_key(k)?; - let new_list = BlockNumberList::decompress_owned(v)?; - - if index > 0 && index.is_multiple_of(interval) && total_entries > 10 { - info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices"); - } - - // AccountsHistory: `Address`. - // StorageHistory: `Address.StorageKey`. - let partial_key = get_partial(sharded_key); - - if current_partial != Some(partial_key) { - // We have reached the end of this subset of keys so - // we need to flush its last indice shard. - if let Some(current) = current_partial { - load_indices( - &mut write_cursor, - current, - &mut current_list, - &sharded_key_factory, - append_only, - LoadMode::Flush, - )?; - } - - current_partial = Some(partial_key); - current_list.clear(); - - // If it's not the first sync, there might an existing shard already, so we need to - // merge it with the one coming from the collector - if !append_only && - let Some((_, last_database_shard)) = - write_cursor.seek_exact(sharded_key_factory(partial_key, u64::MAX))? - { - current_list.extend(last_database_shard.iter()); - } - } - - current_list.extend(new_list.iter()); - load_indices( - &mut write_cursor, - partial_key, - &mut current_list, - &sharded_key_factory, - append_only, - LoadMode::KeepLast, - )?; - } - - // There will be one remaining shard that needs to be flushed to DB. - if let Some(current) = current_partial { - load_indices( - &mut write_cursor, - current, - &mut current_list, - &sharded_key_factory, - append_only, - LoadMode::Flush, - )?; - } - - Ok(()) -} - -/// Shard and insert the indices list according to [`LoadMode`] and its length. -pub(crate) fn load_indices( - cursor: &mut C, - partial_key: P, - list: &mut Vec, - sharded_key_factory: &impl Fn(P, BlockNumber) -> ::Key, - append_only: bool, - mode: LoadMode, -) -> Result<(), StageError> -where - C: DbCursorRO + DbCursorRW, - H: Table, - P: Copy, -{ - if list.len() > NUM_OF_INDICES_IN_SHARD || mode.is_flush() { - let chunks = list - .chunks(NUM_OF_INDICES_IN_SHARD) - .map(|chunks| chunks.to_vec()) - .collect::>>(); - - let mut iter = chunks.into_iter().peekable(); - while let Some(chunk) = iter.next() { - let mut highest = *chunk.last().expect("at least one index"); - - if !mode.is_flush() && iter.peek().is_none() { - *list = chunk; - } else { - if iter.peek().is_none() { - highest = u64::MAX; - } - let key = sharded_key_factory(partial_key, highest); - let value = BlockNumberList::new_pre_sorted(chunk); - - if append_only { - cursor.append(key, &value)?; - } else { - cursor.upsert(key, &value)?; - } - } - } - } - - Ok(()) -} - -/// Mode on how to load index shards into the database. -pub(crate) enum LoadMode { - /// Keep the last shard in memory and don't flush it to the database. - KeepLast, - /// Flush all shards into the database. - Flush, -} - -impl LoadMode { - const fn is_flush(&self) -> bool { - matches!(self, Self::Flush) - } -} - /// Loads account history indices into the database via `EitherWriter`. /// -/// Similar to [`load_history_indices`] but works with [`EitherWriter`] to support -/// both MDBX and `RocksDB` backends. +/// Works with [`EitherWriter`] to support both MDBX and `RocksDB` backends. /// /// ## Process /// Iterates over elements, grouping indices by their address. It flushes indices to disk @@ -404,8 +252,6 @@ where /// Only flushes when we have more than one shard's worth of data, keeping the last /// (possibly partial) shard for continued accumulation. This avoids writing a shard /// that may need to be updated when more indices arrive. -/// -/// Equivalent to [`load_indices`] with [`LoadMode::KeepLast`]. fn flush_account_history_shards_partial( address: Address, list: &mut Vec, @@ -462,8 +308,6 @@ where /// /// The `u64::MAX` key for the final shard is an invariant that allows `seek_exact(address, /// u64::MAX)` to find the last shard during incremental sync for merging with new indices. -/// -/// Equivalent to [`load_indices`] with [`LoadMode::Flush`]. fn flush_account_history_shards( address: Address, list: &mut Vec, @@ -537,3 +381,191 @@ where segment, }) } + +/// Loads storage history indices into the database via `EitherWriter`. +/// +/// Works with [`EitherWriter`] to support both MDBX and `RocksDB` backends. +/// +/// ## Process +/// Iterates over elements, grouping indices by their (address, `storage_key`) pairs. It flushes +/// indices to disk when reaching a shard's max length (`NUM_OF_INDICES_IN_SHARD`) or when the +/// (address, `storage_key`) pair changes, ensuring the last previous shard is stored. +/// +/// Uses `Option<(Address, B256)>` instead of default values as the sentinel to avoid +/// incorrectly treating `(Address::ZERO, B256::ZERO)` as "no previous key". +pub(crate) fn load_storage_history( + mut collector: Collector, + append_only: bool, + writer: &mut EitherWriter<'_, CURSOR, N>, +) -> Result<(), StageError> +where + N: NodePrimitives, + CURSOR: DbCursorRW + + DbCursorRO, +{ + let mut current_key: Option<(Address, B256)> = None; + // Accumulator for block numbers where the current (address, storage_key) changed. + let mut current_list = Vec::::new(); + + let total_entries = collector.len(); + let interval = (total_entries / 10).max(1); + + for (index, element) in collector.iter()?.enumerate() { + let (k, v) = element?; + let sharded_key = StorageShardedKey::decode_owned(k)?; + let new_list = BlockNumberList::decompress_owned(v)?; + + if index > 0 && index.is_multiple_of(interval) && total_entries > 10 { + info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices"); + } + + let partial_key = (sharded_key.address, sharded_key.sharded_key.key); + + // When (address, storage_key) changes, flush the previous key's shards and start fresh. + if current_key != Some(partial_key) { + // Flush all remaining shards for the previous key (uses u64::MAX for last shard). + if let Some((prev_addr, prev_storage_key)) = current_key { + flush_storage_history_shards( + prev_addr, + prev_storage_key, + &mut current_list, + append_only, + writer, + )?; + } + + current_key = Some(partial_key); + current_list.clear(); + + // On incremental sync, merge with the existing last shard from the database. + // The last shard is stored with key (address, storage_key, u64::MAX) so we can find it. + if !append_only && + let Some(last_shard) = + writer.get_last_storage_history_shard(partial_key.0, partial_key.1)? + { + current_list.extend(last_shard.iter()); + } + } + + // Append new block numbers to the accumulator. + current_list.extend(new_list.iter()); + + // Flush complete shards, keeping the last (partial) shard buffered. + flush_storage_history_shards_partial( + partial_key.0, + partial_key.1, + &mut current_list, + append_only, + writer, + )?; + } + + // Flush the final key's remaining shard. + if let Some((addr, storage_key)) = current_key { + flush_storage_history_shards(addr, storage_key, &mut current_list, append_only, writer)?; + } + + Ok(()) +} + +/// Flushes complete shards for storage history, keeping the trailing partial shard buffered. +/// +/// Only flushes when we have more than one shard's worth of data, keeping the last +/// (possibly partial) shard for continued accumulation. This avoids writing a shard +/// that may need to be updated when more indices arrive. +fn flush_storage_history_shards_partial( + address: Address, + storage_key: B256, + list: &mut Vec, + append_only: bool, + writer: &mut EitherWriter<'_, CURSOR, N>, +) -> Result<(), StageError> +where + N: NodePrimitives, + CURSOR: DbCursorRW + + DbCursorRO, +{ + // Nothing to flush if we haven't filled a complete shard yet. + if list.len() <= NUM_OF_INDICES_IN_SHARD { + return Ok(()); + } + + let num_full_shards = list.len() / NUM_OF_INDICES_IN_SHARD; + + // Always keep at least one shard buffered for continued accumulation. + // If len is exact multiple of shard size, keep the last full shard. + let shards_to_flush = if list.len().is_multiple_of(NUM_OF_INDICES_IN_SHARD) { + num_full_shards - 1 + } else { + num_full_shards + }; + + if shards_to_flush == 0 { + return Ok(()); + } + + // Split: flush the first N shards, keep the remainder buffered. + let flush_len = shards_to_flush * NUM_OF_INDICES_IN_SHARD; + let remainder = list.split_off(flush_len); + + // Write each complete shard with its highest block number as the key. + for chunk in list.chunks(NUM_OF_INDICES_IN_SHARD) { + let highest = *chunk.last().expect("chunk is non-empty"); + let key = StorageShardedKey::new(address, storage_key, highest); + let value = BlockNumberList::new_pre_sorted(chunk.iter().copied()); + + if append_only { + writer.append_storage_history(key, &value)?; + } else { + writer.upsert_storage_history(key, &value)?; + } + } + + // Keep the remaining indices for the next iteration. + *list = remainder; + Ok(()) +} + +/// Flushes all remaining shards for storage history, using `u64::MAX` for the last shard. +/// +/// The `u64::MAX` key for the final shard is an invariant that allows +/// `seek_exact(address, storage_key, u64::MAX)` to find the last shard during incremental +/// sync for merging with new indices. +fn flush_storage_history_shards( + address: Address, + storage_key: B256, + list: &mut Vec, + append_only: bool, + writer: &mut EitherWriter<'_, CURSOR, N>, +) -> Result<(), StageError> +where + N: NodePrimitives, + CURSOR: DbCursorRW + + DbCursorRO, +{ + if list.is_empty() { + return Ok(()); + } + + let num_chunks = list.len().div_ceil(NUM_OF_INDICES_IN_SHARD); + + for (i, chunk) in list.chunks(NUM_OF_INDICES_IN_SHARD).enumerate() { + let is_last = i == num_chunks - 1; + + // Use u64::MAX for the final shard's key. This invariant allows incremental sync + // to find the last shard via seek_exact(address, storage_key, u64::MAX) for merging. + let highest = if is_last { u64::MAX } else { *chunk.last().expect("chunk is non-empty") }; + + let key = StorageShardedKey::new(address, storage_key, highest); + let value = BlockNumberList::new_pre_sorted(chunk.iter().copied()); + + if append_only { + writer.append_storage_history(key, &value)?; + } else { + writer.upsert_storage_history(key, &value)?; + } + } + + list.clear(); + Ok(()) +} diff --git a/crates/storage/provider/src/either_writer.rs b/crates/storage/provider/src/either_writer.rs index 16eced90dd..c6ba79d031 100644 --- a/crates/storage/provider/src/either_writer.rs +++ b/crates/storage/provider/src/either_writer.rs @@ -13,7 +13,7 @@ use crate::{ providers::{history_info, HistoryInfo, StaticFileProvider, StaticFileProviderRWRefMut}, StaticFileProviderFactory, }; -use alloy_primitives::{map::HashMap, Address, BlockNumber, TxHash, TxNumber}; +use alloy_primitives::{map::HashMap, Address, BlockNumber, TxHash, TxNumber, B256}; use rayon::slice::ParallelSliceMut; use reth_db::{ cursor::{DbCursorRO, DbDupCursorRW}, @@ -512,6 +512,49 @@ where Self::RocksDB(batch) => batch.delete::(key), } } + + /// Appends a storage history entry (for first sync - more efficient). + pub fn append_storage_history( + &mut self, + key: StorageShardedKey, + value: &BlockNumberList, + ) -> ProviderResult<()> { + match self { + Self::Database(cursor) => Ok(cursor.append(key, value)?), + Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider), + #[cfg(all(unix, feature = "rocksdb"))] + Self::RocksDB(batch) => batch.put::(key, value), + } + } + + /// Upserts a storage history entry (for incremental sync). + pub fn upsert_storage_history( + &mut self, + key: StorageShardedKey, + value: &BlockNumberList, + ) -> ProviderResult<()> { + match self { + Self::Database(cursor) => Ok(cursor.upsert(key, value)?), + Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider), + #[cfg(all(unix, feature = "rocksdb"))] + Self::RocksDB(batch) => batch.put::(key, value), + } + } + + /// Gets the last shard for an address and storage key (keyed with `u64::MAX`). + pub fn get_last_storage_history_shard( + &mut self, + address: Address, + storage_key: B256, + ) -> ProviderResult> { + let key = StorageShardedKey::last(address, storage_key); + match self { + Self::Database(cursor) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)), + Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider), + #[cfg(all(unix, feature = "rocksdb"))] + Self::RocksDB(batch) => batch.get::(key), + } + } } impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index a8032ae66a..9a41bc243d 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -3005,25 +3005,35 @@ impl HistoryWriter for DatabaseProvi .collect::>(); storage_changesets.sort_by_key(|(address, key, _)| (*address, *key)); - let mut cursor = self.tx.cursor_write::()?; - for &(address, storage_key, rem_index) in &storage_changesets { - let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>( - &mut cursor, - StorageShardedKey::last(address, storage_key), - rem_index, - |storage_sharded_key| { - storage_sharded_key.address == address && - storage_sharded_key.sharded_key.key == storage_key - }, - )?; - - // Check the last returned partial shard. - // If it's not empty, the shard needs to be reinserted. - if !partial_shard.is_empty() { - cursor.insert( + if self.cached_storage_settings().storages_history_in_rocksdb { + #[cfg(all(unix, feature = "rocksdb"))] + { + let batch = + self.rocksdb_provider.unwind_storage_history_indices(&storage_changesets)?; + self.pending_rocksdb_batches.lock().push(batch); + } + } else { + // Unwind the storage history index in MDBX. + let mut cursor = self.tx.cursor_write::()?; + for &(address, storage_key, rem_index) in &storage_changesets { + let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>( + &mut cursor, StorageShardedKey::last(address, storage_key), - &BlockNumberList::new_pre_sorted(partial_shard), + rem_index, + |storage_sharded_key| { + storage_sharded_key.address == address && + storage_sharded_key.sharded_key.key == storage_key + }, )?; + + // Check the last returned partial shard. + // If it's not empty, the shard needs to be reinserted. + if !partial_shard.is_empty() { + cursor.insert( + StorageShardedKey::last(address, storage_key), + &BlockNumberList::new_pre_sorted(partial_shard), + )?; + } } } diff --git a/crates/storage/provider/src/providers/rocksdb/provider.rs b/crates/storage/provider/src/providers/rocksdb/provider.rs index 129d8f1100..7824059086 100644 --- a/crates/storage/provider/src/providers/rocksdb/provider.rs +++ b/crates/storage/provider/src/providers/rocksdb/provider.rs @@ -814,6 +814,52 @@ impl RocksDBProvider { Ok(result) } + /// Returns all storage history shards for the given `(address, storage_key)` pair. + /// + /// Iterates through all shards in ascending `highest_block_number` order until + /// a different `(address, storage_key)` is encountered. + pub fn storage_history_shards( + &self, + address: Address, + storage_key: B256, + ) -> ProviderResult> { + let cf = self.get_cf_handle::()?; + + let start_key = StorageShardedKey::new(address, storage_key, 0u64); + let start_bytes = start_key.encode(); + + let iter = self + .0 + .iterator_cf(cf, IteratorMode::From(start_bytes.as_ref(), rocksdb::Direction::Forward)); + + let mut result = Vec::new(); + for item in iter { + match item { + Ok((key_bytes, value_bytes)) => { + let key = StorageShardedKey::decode(&key_bytes) + .map_err(|_| ProviderError::Database(DatabaseError::Decode))?; + + if key.address != address || key.sharded_key.key != storage_key { + break; + } + + let value = BlockNumberList::decompress(&value_bytes) + .map_err(|_| ProviderError::Database(DatabaseError::Decode))?; + + result.push((key, value)); + } + Err(e) => { + return Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo { + message: e.to_string().into(), + code: -1, + }))); + } + } + } + + Ok(result) + } + /// Unwinds account history indices for the given `(address, block_number)` pairs. /// /// Groups addresses by their minimum block number and calls the appropriate unwind @@ -846,6 +892,37 @@ impl RocksDBProvider { Ok(batch.into_inner()) } + /// Unwinds storage history indices for the given `(address, storage_key, block_number)` tuples. + /// + /// Groups by `(address, storage_key)` and finds the minimum block number for each. + /// For each key, keeps only blocks less than the minimum block + /// (i.e., removes the minimum block and all higher blocks). + /// + /// Returns a `WriteBatchWithTransaction` that can be committed later. + pub fn unwind_storage_history_indices( + &self, + storage_changesets: &[(Address, B256, BlockNumber)], + ) -> ProviderResult> { + let mut key_min_block: HashMap<(Address, B256), BlockNumber> = + HashMap::with_capacity_and_hasher(storage_changesets.len(), Default::default()); + for &(address, storage_key, block_number) in storage_changesets { + key_min_block + .entry((address, storage_key)) + .and_modify(|min| *min = (*min).min(block_number)) + .or_insert(block_number); + } + + let mut batch = self.batch(); + for ((address, storage_key), min_block) in key_min_block { + match min_block.checked_sub(1) { + Some(keep_to) => batch.unwind_storage_history_to(address, storage_key, keep_to)?, + None => batch.clear_storage_history(address, storage_key)?, + } + } + + Ok(batch.into_inner()) + } + /// Writes a batch of operations atomically. #[instrument(level = "debug", target = "providers::rocksdb", skip_all)] pub fn write_batch(&self, f: F) -> ProviderResult<()> @@ -1290,6 +1367,87 @@ impl<'a> RocksDBBatch<'a> { Ok(()) } + /// Unwinds storage history to keep only blocks `<= keep_to`. + /// + /// Handles multi-shard scenarios by: + /// 1. Loading all shards for the `(address, storage_key)` pair + /// 2. Finding the boundary shard containing `keep_to` + /// 3. Deleting all shards after the boundary + /// 4. Truncating the boundary shard to keep only indices `<= keep_to` + /// 5. Ensuring the last shard is keyed with `u64::MAX` + pub fn unwind_storage_history_to( + &mut self, + address: Address, + storage_key: B256, + keep_to: BlockNumber, + ) -> ProviderResult<()> { + let shards = self.provider.storage_history_shards(address, storage_key)?; + if shards.is_empty() { + return Ok(()); + } + + // Find the first shard that might contain blocks > keep_to. + // A shard is affected if it's the sentinel (u64::MAX) or its highest_block_number > keep_to + let boundary_idx = shards.iter().position(|(key, _)| { + key.sharded_key.highest_block_number == u64::MAX || + key.sharded_key.highest_block_number > keep_to + }); + + // Repair path: no shards affected means all blocks <= keep_to, just ensure sentinel exists + let Some(boundary_idx) = boundary_idx else { + let (last_key, last_value) = shards.last().expect("shards is non-empty"); + if last_key.sharded_key.highest_block_number != u64::MAX { + self.delete::(last_key.clone())?; + self.put::( + StorageShardedKey::last(address, storage_key), + last_value, + )?; + } + return Ok(()); + }; + + // Delete all shards strictly after the boundary (they are entirely > keep_to) + for (key, _) in shards.iter().skip(boundary_idx + 1) { + self.delete::(key.clone())?; + } + + // Process the boundary shard: filter out blocks > keep_to + let (boundary_key, boundary_list) = &shards[boundary_idx]; + + // Delete the boundary shard (we'll either drop it or rewrite at u64::MAX) + self.delete::(boundary_key.clone())?; + + // Build truncated list once; check emptiness directly (avoids double iteration) + let new_last = + BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to)); + + if new_last.is_empty() { + // Boundary shard is now empty. Previous shard becomes the last and must be keyed + // u64::MAX. + if boundary_idx == 0 { + // Nothing left for this (address, storage_key) pair + return Ok(()); + } + + let (prev_key, prev_value) = &shards[boundary_idx - 1]; + if prev_key.sharded_key.highest_block_number != u64::MAX { + self.delete::(prev_key.clone())?; + self.put::( + StorageShardedKey::last(address, storage_key), + prev_value, + )?; + } + return Ok(()); + } + + self.put::( + StorageShardedKey::last(address, storage_key), + &new_last, + )?; + + Ok(()) + } + /// Clears all account history shards for the given address. /// /// Used when unwinding from block 0 (i.e., removing all history). @@ -1300,6 +1458,21 @@ impl<'a> RocksDBBatch<'a> { } Ok(()) } + + /// Clears all storage history shards for the given `(address, storage_key)` pair. + /// + /// Used when unwinding from block 0 (i.e., removing all history for this storage slot). + pub fn clear_storage_history( + &mut self, + address: Address, + storage_key: B256, + ) -> ProviderResult<()> { + let shards = self.provider.storage_history_shards(address, storage_key)?; + for (key, _) in shards { + self.delete::(key)?; + } + Ok(()) + } } /// `RocksDB` transaction wrapper providing MDBX-like semantics.