From bd144a4c42758b4eec814807db562b1e1b4b64c8 Mon Sep 17 00:00:00 2001 From: YK Date: Tue, 20 Jan 2026 15:23:29 +0100 Subject: [PATCH] feat(stages): add RocksDB support for IndexAccountHistoryStage (#21165) Co-authored-by: Georgios Konstantopoulos Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com> --- .../src/stages/index_account_history.rs | 210 ++++++- crates/stages/stages/src/stages/utils.rs | 188 ++++++- crates/storage/provider/src/either_writer.rs | 37 +- .../src/providers/database/provider.rs | 40 +- .../src/providers/rocksdb/provider.rs | 511 +++++++++++++++++- .../provider/src/traits/rocksdb_provider.rs | 29 +- 6 files changed, 970 insertions(+), 45 deletions(-) diff --git a/crates/stages/stages/src/stages/index_account_history.rs b/crates/stages/stages/src/stages/index_account_history.rs index 25dbf10445..92fa5f3244 100644 --- a/crates/stages/stages/src/stages/index_account_history.rs +++ b/crates/stages/stages/src/stages/index_account_history.rs @@ -1,11 +1,10 @@ -use crate::stages::utils::collect_history_indices; - -use super::{collect_account_history_indices, load_history_indices}; -use alloy_primitives::Address; +use super::collect_account_history_indices; +use crate::stages::utils::{collect_history_indices, load_account_history}; use reth_config::config::{EtlConfig, IndexHistoryConfig}; -use reth_db_api::{models::ShardedKey, table::Decode, tables, transaction::DbTxMut}; +use reth_db_api::{models::ShardedKey, tables, transaction::DbTxMut}; use reth_provider::{ - DBProvider, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter, StorageSettingsCache, + DBProvider, EitherWriter, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter, + RocksDBProviderFactory, StorageSettingsCache, }; use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment}; use reth_stages_api::{ @@ -53,7 +52,8 @@ where + PruneCheckpointWriter + reth_storage_api::ChangeSetReader + reth_provider::StaticFileProviderFactory - + StorageSettingsCache, + + StorageSettingsCache + + RocksDBProviderFactory, { /// Return the id of the stage fn id(&self) -> StageId { @@ -101,15 +101,25 @@ where let mut range = input.next_block_range(); let first_sync = input.checkpoint().block_number == 0; + let use_rocksdb = provider.cached_storage_settings().account_history_in_rocksdb; // On first sync we might have history coming from genesis. We clear the table since it's // faster to rebuild from scratch. if first_sync { - provider.tx_ref().clear::()?; + if use_rocksdb { + // Note: RocksDB clear() executes immediately (not deferred to commit like MDBX), + // but this is safe for first_sync because if we crash before commit, the + // checkpoint stays at 0 and we'll just clear and rebuild again on restart. The + // source data (changesets) is intact. + #[cfg(all(unix, feature = "rocksdb"))] + provider.rocksdb_provider().clear::()?; + } else { + provider.tx_ref().clear::()?; + } range = 0..=*input.next_block_range().end(); } - info!(target: "sync::stages::index_account_history::exec", ?first_sync, "Collecting indices"); + info!(target: "sync::stages::index_account_history::exec", ?first_sync, ?use_rocksdb, "Collecting indices"); let collector = if provider.cached_storage_settings().account_changesets_in_static_files { // Use the provider-based collection that can read from static files. @@ -125,14 +135,13 @@ where }; info!(target: "sync::stages::index_account_history::exec", "Loading indices into database"); - load_history_indices::<_, tables::AccountsHistory, _>( - provider, - collector, - first_sync, - ShardedKey::new, - ShardedKey::
::decode_owned, - |key| key.key, - )?; + + provider.with_rocksdb_batch(|rocksdb_batch| { + let mut writer = EitherWriter::new_accounts_history(provider, rocksdb_batch)?; + load_account_history(collector, first_sync, &mut writer) + .map_err(|e| reth_provider::ProviderError::other(Box::new(e)))?; + Ok(((), writer.into_raw_rocksdb_batch())) + })?; Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: true }) } @@ -160,7 +169,7 @@ mod tests { stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestStageDB, UnwindStageTestRunner, }; - use alloy_primitives::{address, BlockNumber, B256}; + use alloy_primitives::{address, Address, BlockNumber, B256}; use itertools::Itertools; use reth_db_api::{ cursor::DbCursorRO, @@ -646,4 +655,169 @@ mod tests { Ok(()) } } + + #[cfg(all(unix, feature = "rocksdb"))] + mod rocksdb_tests { + use super::*; + use reth_provider::RocksDBProviderFactory; + use reth_storage_api::StorageSettings; + + /// Test that when `account_history_in_rocksdb` is enabled, the stage + /// writes account history indices to `RocksDB` instead of MDBX. + #[tokio::test] + async fn execute_writes_to_rocksdb_when_enabled() { + // init + let db = TestStageDB::default(); + + // Enable RocksDB for account history + db.factory.set_storage_settings_cache( + StorageSettings::legacy().with_account_history_in_rocksdb(true), + ); + + db.commit(|tx| { + for block in 0..=10 { + tx.put::( + block, + StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, + )?; + tx.put::(block, acc())?; + } + Ok(()) + }) + .unwrap(); + + let input = ExecInput { target: Some(10), ..Default::default() }; + let mut stage = IndexAccountHistoryStage::default(); + let provider = db.factory.database_provider_rw().unwrap(); + let out = stage.execute(&provider, input).unwrap(); + assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }); + provider.commit().unwrap(); + + // Verify MDBX table is empty (data should be in RocksDB) + let mdbx_table = db.table::().unwrap(); + assert!( + mdbx_table.is_empty(), + "MDBX AccountsHistory should be empty when RocksDB is enabled" + ); + + // Verify RocksDB has the data + let rocksdb = db.factory.rocksdb_provider(); + let result = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(result.is_some(), "RocksDB should contain account history"); + + let block_list = result.unwrap(); + let blocks: Vec = block_list.iter().collect(); + assert_eq!(blocks, (0..=10).collect::>()); + } + + /// Test that unwind works correctly when `account_history_in_rocksdb` is enabled. + #[tokio::test] + async fn unwind_works_when_rocksdb_enabled() { + let db = TestStageDB::default(); + + db.factory.set_storage_settings_cache( + StorageSettings::legacy().with_account_history_in_rocksdb(true), + ); + + db.commit(|tx| { + for block in 0..=10 { + tx.put::( + block, + StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, + )?; + tx.put::(block, acc())?; + } + Ok(()) + }) + .unwrap(); + + let input = ExecInput { target: Some(10), ..Default::default() }; + let mut stage = IndexAccountHistoryStage::default(); + let provider = db.factory.database_provider_rw().unwrap(); + let out = stage.execute(&provider, input).unwrap(); + assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }); + provider.commit().unwrap(); + + // Verify RocksDB has blocks 0-10 before unwind + let rocksdb = db.factory.rocksdb_provider(); + let result = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(result.is_some(), "RocksDB should have data before unwind"); + let blocks_before: Vec = result.unwrap().iter().collect(); + assert_eq!(blocks_before, (0..=10).collect::>()); + + // Unwind to block 5 (remove blocks 6-10) + let unwind_input = + UnwindInput { checkpoint: StageCheckpoint::new(10), unwind_to: 5, bad_block: None }; + let provider = db.factory.database_provider_rw().unwrap(); + let out = stage.unwind(&provider, unwind_input).unwrap(); + assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(5) }); + provider.commit().unwrap(); + + // Verify RocksDB now only has blocks 0-5 (blocks 6-10 removed) + let rocksdb = db.factory.rocksdb_provider(); + let result = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(result.is_some(), "RocksDB should still have data after unwind"); + let blocks_after: Vec = result.unwrap().iter().collect(); + assert_eq!(blocks_after, (0..=5).collect::>(), "Should only have blocks 0-5"); + } + + /// Test incremental sync merges new data with existing shards. + #[tokio::test] + async fn execute_incremental_sync() { + let db = TestStageDB::default(); + + db.factory.set_storage_settings_cache( + StorageSettings::legacy().with_account_history_in_rocksdb(true), + ); + + db.commit(|tx| { + for block in 0..=5 { + tx.put::( + block, + StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, + )?; + tx.put::(block, acc())?; + } + Ok(()) + }) + .unwrap(); + + let input = ExecInput { target: Some(5), ..Default::default() }; + let mut stage = IndexAccountHistoryStage::default(); + let provider = db.factory.database_provider_rw().unwrap(); + let out = stage.execute(&provider, input).unwrap(); + assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(5), done: true }); + provider.commit().unwrap(); + + let rocksdb = db.factory.rocksdb_provider(); + let result = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(result.is_some()); + let blocks: Vec = result.unwrap().iter().collect(); + assert_eq!(blocks, (0..=5).collect::>()); + + db.commit(|tx| { + for block in 6..=10 { + tx.put::( + block, + StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, + )?; + tx.put::(block, acc())?; + } + Ok(()) + }) + .unwrap(); + + let input = ExecInput { target: Some(10), checkpoint: Some(StageCheckpoint::new(5)) }; + let provider = db.factory.database_provider_rw().unwrap(); + let out = stage.execute(&provider, input).unwrap(); + assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }); + provider.commit().unwrap(); + + let rocksdb = db.factory.rocksdb_provider(); + let result = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(result.is_some(), "RocksDB should have merged data"); + let blocks: Vec = result.unwrap().iter().collect(); + assert_eq!(blocks, (0..=10).collect::>()); + } + } } diff --git a/crates/stages/stages/src/stages/utils.rs b/crates/stages/stages/src/stages/utils.rs index 82760a09e2..7d7d5612b9 100644 --- a/crates/stages/stages/src/stages/utils.rs +++ b/crates/stages/stages/src/stages/utils.rs @@ -4,13 +4,14 @@ use reth_config::config::EtlConfig; use reth_db_api::{ cursor::{DbCursorRO, DbCursorRW}, models::{sharded_key::NUM_OF_INDICES_IN_SHARD, AccountBeforeTx, ShardedKey}, - table::{Decompress, Table}, + table::{Decode, Decompress, Table}, transaction::{DbTx, DbTxMut}, BlockNumberList, DatabaseError, }; use reth_etl::Collector; +use reth_primitives_traits::NodePrimitives; use reth_provider::{ - providers::StaticFileProvider, to_range, BlockReader, DBProvider, ProviderError, + providers::StaticFileProvider, to_range, BlockReader, DBProvider, EitherWriter, ProviderError, StaticFileProviderFactory, }; use reth_stages_api::StageError; @@ -108,7 +109,7 @@ where for (address, indices) in cache { insert_fn(address, indices)? } - Ok::<(), StageError>(()) + Ok(()) } /// Collects account history indices using a provider that implements `ChangeSetReader`. @@ -124,12 +125,12 @@ where let mut cache: HashMap> = HashMap::default(); let mut insert_fn = |address: Address, indices: Vec| { - let last = indices.last().expect("qed"); + let last = indices.last().expect("indices is non-empty"); collector.insert( ShardedKey::new(address, *last), BlockNumberList::new_pre_sorted(indices.into_iter()), )?; - Ok::<(), StageError>(()) + Ok(()) }; // Convert range bounds to concrete range @@ -320,6 +321,183 @@ impl LoadMode { } } +/// Loads account history indices into the database via `EitherWriter`. +/// +/// Similar to [`load_history_indices`] but works with [`EitherWriter`] to support +/// both MDBX and `RocksDB` backends. +/// +/// ## Process +/// Iterates over elements, grouping indices by their address. It flushes indices to disk +/// when reaching a shard's max length (`NUM_OF_INDICES_IN_SHARD`) or when the address changes, +/// ensuring the last previous address shard is stored. +/// +/// Uses `Option
` instead of `Address::default()` as the sentinel to avoid +/// incorrectly treating `Address::ZERO` as "no previous address". +pub(crate) fn load_account_history( + mut collector: Collector, BlockNumberList>, + append_only: bool, + writer: &mut EitherWriter<'_, CURSOR, N>, +) -> Result<(), StageError> +where + N: NodePrimitives, + CURSOR: DbCursorRW + + DbCursorRO, +{ + let mut current_address: Option
= None; + // Accumulator for block numbers where the current address changed. + let mut current_list = Vec::::new(); + + let total_entries = collector.len(); + let interval = (total_entries / 10).max(1); + + for (index, element) in collector.iter()?.enumerate() { + let (k, v) = element?; + let sharded_key = ShardedKey::
::decode_owned(k)?; + let new_list = BlockNumberList::decompress_owned(v)?; + + if index > 0 && index.is_multiple_of(interval) && total_entries > 10 { + info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices"); + } + + let address = sharded_key.key; + + // When address changes, flush the previous address's shards and start fresh. + if current_address != Some(address) { + // Flush all remaining shards for the previous address (uses u64::MAX for last shard). + if let Some(prev_addr) = current_address { + flush_account_history_shards(prev_addr, &mut current_list, append_only, writer)?; + } + + current_address = Some(address); + current_list.clear(); + + // On incremental sync, merge with the existing last shard from the database. + // The last shard is stored with key (address, u64::MAX) so we can find it. + if !append_only && + let Some(last_shard) = writer.get_last_account_history_shard(address)? + { + current_list.extend(last_shard.iter()); + } + } + + // Append new block numbers to the accumulator. + current_list.extend(new_list.iter()); + + // Flush complete shards, keeping the last (partial) shard buffered. + flush_account_history_shards_partial(address, &mut current_list, append_only, writer)?; + } + + // Flush the final address's remaining shard. + if let Some(addr) = current_address { + flush_account_history_shards(addr, &mut current_list, append_only, writer)?; + } + + Ok(()) +} + +/// Flushes complete shards for account history, keeping the trailing partial shard buffered. +/// +/// Only flushes when we have more than one shard's worth of data, keeping the last +/// (possibly partial) shard for continued accumulation. This avoids writing a shard +/// that may need to be updated when more indices arrive. +/// +/// Equivalent to [`load_indices`] with [`LoadMode::KeepLast`]. +fn flush_account_history_shards_partial( + address: Address, + list: &mut Vec, + append_only: bool, + writer: &mut EitherWriter<'_, CURSOR, N>, +) -> Result<(), StageError> +where + N: NodePrimitives, + CURSOR: DbCursorRW + + DbCursorRO, +{ + // Nothing to flush if we haven't filled a complete shard yet. + if list.len() <= NUM_OF_INDICES_IN_SHARD { + return Ok(()); + } + + let num_full_shards = list.len() / NUM_OF_INDICES_IN_SHARD; + + // Always keep at least one shard buffered for continued accumulation. + // If len is exact multiple of shard size, keep the last full shard. + let shards_to_flush = if list.len().is_multiple_of(NUM_OF_INDICES_IN_SHARD) { + num_full_shards - 1 + } else { + num_full_shards + }; + + if shards_to_flush == 0 { + return Ok(()); + } + + // Split: flush the first N shards, keep the remainder buffered. + let flush_len = shards_to_flush * NUM_OF_INDICES_IN_SHARD; + let remainder = list.split_off(flush_len); + + // Write each complete shard with its highest block number as the key. + for chunk in list.chunks(NUM_OF_INDICES_IN_SHARD) { + let highest = *chunk.last().expect("chunk is non-empty"); + let key = ShardedKey::new(address, highest); + let value = BlockNumberList::new_pre_sorted(chunk.iter().copied()); + + if append_only { + writer.append_account_history(key, &value)?; + } else { + writer.upsert_account_history(key, &value)?; + } + } + + // Keep the remaining indices for the next iteration. + *list = remainder; + Ok(()) +} + +/// Flushes all remaining shards for account history, using `u64::MAX` for the last shard. +/// +/// The `u64::MAX` key for the final shard is an invariant that allows `seek_exact(address, +/// u64::MAX)` to find the last shard during incremental sync for merging with new indices. +/// +/// Equivalent to [`load_indices`] with [`LoadMode::Flush`]. +fn flush_account_history_shards( + address: Address, + list: &mut Vec, + append_only: bool, + writer: &mut EitherWriter<'_, CURSOR, N>, +) -> Result<(), StageError> +where + N: NodePrimitives, + CURSOR: DbCursorRW + + DbCursorRO, +{ + if list.is_empty() { + return Ok(()); + } + + let num_chunks = list.len().div_ceil(NUM_OF_INDICES_IN_SHARD); + + for (i, chunk) in list.chunks(NUM_OF_INDICES_IN_SHARD).enumerate() { + let is_last = i == num_chunks - 1; + + // Use u64::MAX for the final shard's key. This invariant allows incremental sync + // to find the last shard via seek_exact(address, u64::MAX) for merging. + let highest = if is_last { u64::MAX } else { *chunk.last().expect("chunk is non-empty") }; + + let key = ShardedKey::new(address, highest); + let value = BlockNumberList::new_pre_sorted(chunk.iter().copied()); + + if append_only { + writer.append_account_history(key, &value)?; + } else { + writer.upsert_account_history(key, &value)?; + } + } + + list.clear(); + Ok(()) +} + /// Called when database is ahead of static files. Attempts to find the first block we are missing /// transactions for. pub(crate) fn missing_static_data_error( diff --git a/crates/storage/provider/src/either_writer.rs b/crates/storage/provider/src/either_writer.rs index 5cc79d8522..16eced90dd 100644 --- a/crates/storage/provider/src/either_writer.rs +++ b/crates/storage/provider/src/either_writer.rs @@ -518,8 +518,22 @@ impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> where CURSOR: DbCursorRW + DbCursorRO, { - /// Puts an account history entry. - pub fn put_account_history( + /// Appends an account history entry (for first sync - more efficient). + pub fn append_account_history( + &mut self, + key: ShardedKey
, + value: &BlockNumberList, + ) -> ProviderResult<()> { + match self { + Self::Database(cursor) => Ok(cursor.append(key, value)?), + Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider), + #[cfg(all(unix, feature = "rocksdb"))] + Self::RocksDB(batch) => batch.put::(key, value), + } + } + + /// Upserts an account history entry (for incremental sync). + pub fn upsert_account_history( &mut self, key: ShardedKey
, value: &BlockNumberList, @@ -532,6 +546,21 @@ where } } + /// Gets the last shard for an address (keyed with `u64::MAX`). + pub fn get_last_account_history_shard( + &mut self, + address: Address, + ) -> ProviderResult> { + match self { + Self::Database(cursor) => { + Ok(cursor.seek_exact(ShardedKey::last(address))?.map(|(_, v)| v)) + } + Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider), + #[cfg(all(unix, feature = "rocksdb"))] + Self::RocksDB(batch) => batch.get::(ShardedKey::last(address)), + } + } + /// Deletes an account history entry. pub fn delete_account_history(&mut self, key: ShardedKey
) -> ProviderResult<()> { match self { @@ -1266,8 +1295,8 @@ mod rocksdb_tests { for (highest_block, blocks) in shards { let key = ShardedKey::new(address, *highest_block); let value = IntegerList::new(blocks.clone()).unwrap(); - mdbx_writer.put_account_history(key.clone(), &value).unwrap(); - rocks_writer.put_account_history(key, &value).unwrap(); + mdbx_writer.upsert_account_history(key.clone(), &value).unwrap(); + rocks_writer.upsert_account_history(key, &value).unwrap(); } // Commit both backends diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 0fe7d85472..a8032ae66a 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -2939,25 +2939,33 @@ impl HistoryWriter for DatabaseProvi .into_iter() .map(|(index, account)| (account.address, *index)) .collect::>(); - last_indices.sort_by_key(|(a, _)| *a); + last_indices.sort_unstable_by_key(|(a, _)| *a); - // Unwind the account history index. - let mut cursor = self.tx.cursor_write::()?; - for &(address, rem_index) in &last_indices { - let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>( - &mut cursor, - ShardedKey::last(address), - rem_index, - |sharded_key| sharded_key.key == address, - )?; - - // Check the last returned partial shard. - // If it's not empty, the shard needs to be reinserted. - if !partial_shard.is_empty() { - cursor.insert( + if self.cached_storage_settings().account_history_in_rocksdb { + #[cfg(all(unix, feature = "rocksdb"))] + { + let batch = self.rocksdb_provider.unwind_account_history_indices(&last_indices)?; + self.pending_rocksdb_batches.lock().push(batch); + } + } else { + // Unwind the account history index in MDBX. + let mut cursor = self.tx.cursor_write::()?; + for &(address, rem_index) in &last_indices { + let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>( + &mut cursor, ShardedKey::last(address), - &BlockNumberList::new_pre_sorted(partial_shard), + rem_index, + |sharded_key| sharded_key.key == address, )?; + + // Check the last returned partial shard. + // If it's not empty, the shard needs to be reinserted. + if !partial_shard.is_empty() { + cursor.insert( + ShardedKey::last(address), + &BlockNumberList::new_pre_sorted(partial_shard), + )?; + } } } diff --git a/crates/storage/provider/src/providers/rocksdb/provider.rs b/crates/storage/provider/src/providers/rocksdb/provider.rs index cc427fcb8b..1a1ecbd8f6 100644 --- a/crates/storage/provider/src/providers/rocksdb/provider.rs +++ b/crates/storage/provider/src/providers/rocksdb/provider.rs @@ -25,7 +25,7 @@ use rocksdb::{ OptimisticTransactionOptions, Options, Transaction, WriteBatchWithTransaction, WriteOptions, }; use std::{ - collections::BTreeMap, + collections::{BTreeMap, HashMap}, fmt, path::{Path, PathBuf}, sync::Arc, @@ -430,6 +430,24 @@ impl RocksDBProvider { }) } + /// Clears all entries from the specified table. + /// + /// Uses `delete_range_cf` from empty key to a max key (256 bytes of 0xFF). + /// This end key must exceed the maximum encoded key size for any table. + /// Current max is ~60 bytes (`StorageShardedKey` = 20 + 32 + 8). + pub fn clear(&self) -> ProviderResult<()> { + let cf = self.get_cf_handle::()?; + + self.0.db.delete_range_cf(cf, &[] as &[u8], &[0xFF; 256]).map_err(|e| { + ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo { + message: e.to_string().into(), + code: -1, + })) + })?; + + Ok(()) + } + /// Gets the first (smallest key) entry from the specified table. pub fn first(&self) -> ProviderResult> { self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| { @@ -489,6 +507,90 @@ impl RocksDBProvider { Ok(RocksDBIter { inner: iter, _marker: std::marker::PhantomData }) } + /// Returns all account history shards for the given address in ascending key order. + /// + /// This is used for unwind operations where we need to scan all shards for an address + /// and potentially delete or truncate them. + pub fn account_history_shards( + &self, + address: Address, + ) -> ProviderResult, BlockNumberList)>> { + // Get the column family handle for the AccountsHistory table. + let cf = self.get_cf_handle::()?; + + // Build a seek key starting at the first shard (highest_block_number = 0) for this address. + // ShardedKey is (address, highest_block_number) so this positions us at the beginning. + let start_key = ShardedKey::new(address, 0u64); + let start_bytes = start_key.encode(); + + // Create a forward iterator starting from our seek position. + let iter = self + .0 + .db + .iterator_cf(cf, IteratorMode::From(start_bytes.as_ref(), rocksdb::Direction::Forward)); + + let mut result = Vec::new(); + for item in iter { + match item { + Ok((key_bytes, value_bytes)) => { + // Decode the sharded key to check if we're still on the same address. + let key = ShardedKey::
::decode(&key_bytes) + .map_err(|_| ProviderError::Database(DatabaseError::Decode))?; + + // Stop when we reach a different address (keys are sorted by address first). + if key.key != address { + break; + } + + // Decompress the block number list stored in this shard. + let value = BlockNumberList::decompress(&value_bytes) + .map_err(|_| ProviderError::Database(DatabaseError::Decode))?; + + result.push((key, value)); + } + Err(e) => { + return Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo { + message: e.to_string().into(), + code: -1, + }))); + } + } + } + + Ok(result) + } + + /// Unwinds account history indices for the given `(address, block_number)` pairs. + /// + /// Groups addresses by their minimum block number and calls the appropriate unwind + /// operations. For each address, keeps only blocks less than the minimum block + /// (i.e., removes the minimum block and all higher blocks). + /// + /// Returns a `WriteBatchWithTransaction` that can be committed later. + pub fn unwind_account_history_indices( + &self, + last_indices: &[(Address, BlockNumber)], + ) -> ProviderResult> { + let mut address_min_block: HashMap = + HashMap::with_capacity_and_hasher(last_indices.len(), Default::default()); + for &(address, block_number) in last_indices { + address_min_block + .entry(address) + .and_modify(|min| *min = (*min).min(block_number)) + .or_insert(block_number); + } + + let mut batch = self.batch(); + for (address, min_block) in address_min_block { + match min_block.checked_sub(1) { + Some(keep_to) => batch.unwind_account_history_to(address, keep_to)?, + None => batch.clear_account_history(address)?, + } + } + + Ok(batch.into_inner()) + } + /// Writes a batch of operations atomically. pub fn write_batch(&self, f: F) -> ProviderResult<()> where @@ -719,6 +821,14 @@ impl<'a> RocksDBBatch<'a> { self.inner } + /// Gets a value from the database. + /// + /// **Important constraint:** This reads only committed state, not pending writes in this + /// batch or other pending batches in `pending_rocksdb_batches`. + pub fn get(&self, key: T::Key) -> ProviderResult> { + self.provider.get::(key) + } + /// Appends indices to an account history shard with proper shard management. /// /// Loads the existing shard (if any), appends new indices, and rechunks into @@ -841,6 +951,91 @@ impl<'a> RocksDBBatch<'a> { Ok(()) } + + /// Unwinds account history for the given address, keeping only blocks <= `keep_to`. + /// + /// Mirrors MDBX `unwind_history_shards` behavior: + /// - Deletes shards entirely above `keep_to` + /// - Truncates boundary shards and re-keys to `u64::MAX` sentinel + /// - Preserves shards entirely below `keep_to` + pub fn unwind_account_history_to( + &mut self, + address: Address, + keep_to: BlockNumber, + ) -> ProviderResult<()> { + let shards = self.provider.account_history_shards(address)?; + if shards.is_empty() { + return Ok(()); + } + + // Find the first shard that might contain blocks > keep_to. + // A shard is affected if it's the sentinel (u64::MAX) or its highest_block_number > keep_to + let boundary_idx = shards.iter().position(|(key, _)| { + key.highest_block_number == u64::MAX || key.highest_block_number > keep_to + }); + + // Repair path: no shards affected means all blocks <= keep_to, just ensure sentinel exists + let Some(boundary_idx) = boundary_idx else { + let (last_key, last_value) = shards.last().expect("shards is non-empty"); + if last_key.highest_block_number != u64::MAX { + self.delete::(last_key.clone())?; + self.put::( + ShardedKey::new(address, u64::MAX), + last_value, + )?; + } + return Ok(()); + }; + + // Delete all shards strictly after the boundary (they are entirely > keep_to) + for (key, _) in shards.iter().skip(boundary_idx + 1) { + self.delete::(key.clone())?; + } + + // Process the boundary shard: filter out blocks > keep_to + let (boundary_key, boundary_list) = &shards[boundary_idx]; + + // Delete the boundary shard (we'll either drop it or rewrite at u64::MAX) + self.delete::(boundary_key.clone())?; + + // Build truncated list once; check emptiness directly (avoids double iteration) + let new_last = + BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to)); + + if new_last.is_empty() { + // Boundary shard is now empty. Previous shard becomes the last and must be keyed + // u64::MAX. + if boundary_idx == 0 { + // Nothing left for this address + return Ok(()); + } + + let (prev_key, prev_value) = &shards[boundary_idx - 1]; + if prev_key.highest_block_number != u64::MAX { + self.delete::(prev_key.clone())?; + self.put::( + ShardedKey::new(address, u64::MAX), + prev_value, + )?; + } + return Ok(()); + } + + self.put::(ShardedKey::new(address, u64::MAX), &new_last)?; + + Ok(()) + } + + /// Clears all account history shards for the given address. + /// + /// Used when unwinding from block 0 (i.e., removing all history). + pub fn clear_account_history(&mut self, address: Address) -> ProviderResult<()> { + let shards = self.provider.account_history_shards(address)?; + for (key, _) in shards { + self.delete::(key)?; + } + Ok(()) + } } /// `RocksDB` transaction wrapper providing MDBX-like semantics. @@ -1720,4 +1915,318 @@ mod tests { "sentinel shard should exist" ); } + + #[test] + fn test_clear_table() { + let temp_dir = TempDir::new().unwrap(); + let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap(); + + let address = Address::from([0x42; 20]); + let key = ShardedKey::new(address, u64::MAX); + let blocks = BlockNumberList::new_pre_sorted([1, 2, 3]); + + provider.put::(key.clone(), &blocks).unwrap(); + assert!(provider.get::(key.clone()).unwrap().is_some()); + + provider.clear::().unwrap(); + + assert!( + provider.get::(key).unwrap().is_none(), + "table should be empty after clear" + ); + assert!( + provider.first::().unwrap().is_none(), + "first() should return None after clear" + ); + } + + #[test] + fn test_clear_empty_table() { + let temp_dir = TempDir::new().unwrap(); + let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap(); + + assert!(provider.first::().unwrap().is_none()); + + provider.clear::().unwrap(); + + assert!(provider.first::().unwrap().is_none()); + } + + #[test] + fn test_unwind_account_history_to_basic() { + let temp_dir = TempDir::new().unwrap(); + let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap(); + + let address = Address::from([0x42; 20]); + + // Add blocks 0-10 + let mut batch = provider.batch(); + batch.append_account_history_shard(address, 0..=10).unwrap(); + batch.commit().unwrap(); + + // Verify we have blocks 0-10 + let key = ShardedKey::new(address, u64::MAX); + let result = provider.get::(key.clone()).unwrap(); + assert!(result.is_some()); + let blocks: Vec = result.unwrap().iter().collect(); + assert_eq!(blocks, (0..=10).collect::>()); + + // Unwind to block 5 (keep blocks 0-5, remove 6-10) + let mut batch = provider.batch(); + batch.unwind_account_history_to(address, 5).unwrap(); + batch.commit().unwrap(); + + // Verify only blocks 0-5 remain + let result = provider.get::(key).unwrap(); + assert!(result.is_some()); + let blocks: Vec = result.unwrap().iter().collect(); + assert_eq!(blocks, (0..=5).collect::>()); + } + + #[test] + fn test_unwind_account_history_to_removes_all() { + let temp_dir = TempDir::new().unwrap(); + let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap(); + + let address = Address::from([0x42; 20]); + + // Add blocks 5-10 + let mut batch = provider.batch(); + batch.append_account_history_shard(address, 5..=10).unwrap(); + batch.commit().unwrap(); + + // Unwind to block 4 (removes all blocks since they're all > 4) + let mut batch = provider.batch(); + batch.unwind_account_history_to(address, 4).unwrap(); + batch.commit().unwrap(); + + // Verify no data remains for this address + let key = ShardedKey::new(address, u64::MAX); + let result = provider.get::(key).unwrap(); + assert!(result.is_none(), "Should have no data after full unwind"); + } + + #[test] + fn test_unwind_account_history_to_no_op() { + let temp_dir = TempDir::new().unwrap(); + let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap(); + + let address = Address::from([0x42; 20]); + + // Add blocks 0-5 + let mut batch = provider.batch(); + batch.append_account_history_shard(address, 0..=5).unwrap(); + batch.commit().unwrap(); + + // Unwind to block 10 (no-op since all blocks are <= 10) + let mut batch = provider.batch(); + batch.unwind_account_history_to(address, 10).unwrap(); + batch.commit().unwrap(); + + // Verify blocks 0-5 still remain + let key = ShardedKey::new(address, u64::MAX); + let result = provider.get::(key).unwrap(); + assert!(result.is_some()); + let blocks: Vec = result.unwrap().iter().collect(); + assert_eq!(blocks, (0..=5).collect::>()); + } + + #[test] + fn test_unwind_account_history_to_block_zero() { + let temp_dir = TempDir::new().unwrap(); + let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap(); + + let address = Address::from([0x42; 20]); + + // Add blocks 0-5 (including block 0) + let mut batch = provider.batch(); + batch.append_account_history_shard(address, 0..=5).unwrap(); + batch.commit().unwrap(); + + // Unwind to block 0 (keep only block 0, remove 1-5) + // This simulates the caller doing: unwind_to = min_block.checked_sub(1) where min_block = 1 + let mut batch = provider.batch(); + batch.unwind_account_history_to(address, 0).unwrap(); + batch.commit().unwrap(); + + // Verify only block 0 remains + let key = ShardedKey::new(address, u64::MAX); + let result = provider.get::(key).unwrap(); + assert!(result.is_some()); + let blocks: Vec = result.unwrap().iter().collect(); + assert_eq!(blocks, vec![0]); + } + + #[test] + fn test_unwind_account_history_to_multi_shard() { + let temp_dir = TempDir::new().unwrap(); + let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap(); + + let address = Address::from([0x42; 20]); + + // Create multiple shards by adding more than NUM_OF_INDICES_IN_SHARD entries + // For testing, we'll manually create shards with specific keys + let mut batch = provider.batch(); + + // First shard: blocks 1-50, keyed by 50 + let shard1 = BlockNumberList::new_pre_sorted(1..=50); + batch.put::(ShardedKey::new(address, 50), &shard1).unwrap(); + + // Second shard: blocks 51-100, keyed by MAX (sentinel) + let shard2 = BlockNumberList::new_pre_sorted(51..=100); + batch.put::(ShardedKey::new(address, u64::MAX), &shard2).unwrap(); + + batch.commit().unwrap(); + + // Verify we have 2 shards + let shards = provider.account_history_shards(address).unwrap(); + assert_eq!(shards.len(), 2); + + // Unwind to block 75 (keep 1-75, remove 76-100) + let mut batch = provider.batch(); + batch.unwind_account_history_to(address, 75).unwrap(); + batch.commit().unwrap(); + + // Verify: shard1 should be untouched, shard2 should be truncated + let shards = provider.account_history_shards(address).unwrap(); + assert_eq!(shards.len(), 2); + + // First shard unchanged + assert_eq!(shards[0].0.highest_block_number, 50); + assert_eq!(shards[0].1.iter().collect::>(), (1..=50).collect::>()); + + // Second shard truncated and re-keyed to MAX + assert_eq!(shards[1].0.highest_block_number, u64::MAX); + assert_eq!(shards[1].1.iter().collect::>(), (51..=75).collect::>()); + } + + #[test] + fn test_unwind_account_history_to_multi_shard_boundary_empty() { + let temp_dir = TempDir::new().unwrap(); + let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap(); + + let address = Address::from([0x42; 20]); + + // Create two shards + let mut batch = provider.batch(); + + // First shard: blocks 1-50, keyed by 50 + let shard1 = BlockNumberList::new_pre_sorted(1..=50); + batch.put::(ShardedKey::new(address, 50), &shard1).unwrap(); + + // Second shard: blocks 75-100, keyed by MAX + let shard2 = BlockNumberList::new_pre_sorted(75..=100); + batch.put::(ShardedKey::new(address, u64::MAX), &shard2).unwrap(); + + batch.commit().unwrap(); + + // Unwind to block 60 (removes all of shard2 since 75 > 60, promotes shard1 to MAX) + let mut batch = provider.batch(); + batch.unwind_account_history_to(address, 60).unwrap(); + batch.commit().unwrap(); + + // Verify: only shard1 remains, now keyed as MAX + let shards = provider.account_history_shards(address).unwrap(); + assert_eq!(shards.len(), 1); + assert_eq!(shards[0].0.highest_block_number, u64::MAX); + assert_eq!(shards[0].1.iter().collect::>(), (1..=50).collect::>()); + } + + #[test] + fn test_account_history_shards_iterator() { + let temp_dir = TempDir::new().unwrap(); + let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap(); + + let address = Address::from([0x42; 20]); + let other_address = Address::from([0x43; 20]); + + // Add data for two addresses + let mut batch = provider.batch(); + batch.append_account_history_shard(address, 0..=5).unwrap(); + batch.append_account_history_shard(other_address, 10..=15).unwrap(); + batch.commit().unwrap(); + + // Query shards for first address only + let shards = provider.account_history_shards(address).unwrap(); + assert_eq!(shards.len(), 1); + assert_eq!(shards[0].0.key, address); + + // Query shards for second address only + let shards = provider.account_history_shards(other_address).unwrap(); + assert_eq!(shards.len(), 1); + assert_eq!(shards[0].0.key, other_address); + + // Query shards for non-existent address + let non_existent = Address::from([0x99; 20]); + let shards = provider.account_history_shards(non_existent).unwrap(); + assert!(shards.is_empty()); + } + + #[test] + fn test_clear_account_history() { + let temp_dir = TempDir::new().unwrap(); + let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap(); + + let address = Address::from([0x42; 20]); + + // Add blocks 0-10 + let mut batch = provider.batch(); + batch.append_account_history_shard(address, 0..=10).unwrap(); + batch.commit().unwrap(); + + // Clear all history (simulates unwind from block 0) + let mut batch = provider.batch(); + batch.clear_account_history(address).unwrap(); + batch.commit().unwrap(); + + // Verify no data remains + let shards = provider.account_history_shards(address).unwrap(); + assert!(shards.is_empty(), "All shards should be deleted"); + } + + #[test] + fn test_unwind_non_sentinel_boundary() { + let temp_dir = TempDir::new().unwrap(); + let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap(); + + let address = Address::from([0x42; 20]); + + // Create three shards with non-sentinel boundary + let mut batch = provider.batch(); + + // Shard 1: blocks 1-50, keyed by 50 + let shard1 = BlockNumberList::new_pre_sorted(1..=50); + batch.put::(ShardedKey::new(address, 50), &shard1).unwrap(); + + // Shard 2: blocks 51-100, keyed by 100 (non-sentinel, will be boundary) + let shard2 = BlockNumberList::new_pre_sorted(51..=100); + batch.put::(ShardedKey::new(address, 100), &shard2).unwrap(); + + // Shard 3: blocks 101-150, keyed by MAX (will be deleted) + let shard3 = BlockNumberList::new_pre_sorted(101..=150); + batch.put::(ShardedKey::new(address, u64::MAX), &shard3).unwrap(); + + batch.commit().unwrap(); + + // Verify 3 shards + let shards = provider.account_history_shards(address).unwrap(); + assert_eq!(shards.len(), 3); + + // Unwind to block 75 (truncates shard2, deletes shard3) + let mut batch = provider.batch(); + batch.unwind_account_history_to(address, 75).unwrap(); + batch.commit().unwrap(); + + // Verify: shard1 unchanged, shard2 truncated and re-keyed to MAX, shard3 deleted + let shards = provider.account_history_shards(address).unwrap(); + assert_eq!(shards.len(), 2); + + // First shard unchanged + assert_eq!(shards[0].0.highest_block_number, 50); + assert_eq!(shards[0].1.iter().collect::>(), (1..=50).collect::>()); + + // Second shard truncated and re-keyed to MAX + assert_eq!(shards[1].0.highest_block_number, u64::MAX); + assert_eq!(shards[1].1.iter().collect::>(), (51..=75).collect::>()); + } } diff --git a/crates/storage/provider/src/traits/rocksdb_provider.rs b/crates/storage/provider/src/traits/rocksdb_provider.rs index 3394fa16f6..b4abacd86e 100644 --- a/crates/storage/provider/src/traits/rocksdb_provider.rs +++ b/crates/storage/provider/src/traits/rocksdb_provider.rs @@ -1,4 +1,7 @@ -use crate::{either_writer::RocksTxRefArg, providers::RocksDBProvider}; +use crate::{ + either_writer::{RawRocksDBBatch, RocksBatchArg, RocksTxRefArg}, + providers::RocksDBProvider, +}; use reth_storage_errors::provider::ProviderResult; /// `RocksDB` provider factory. @@ -31,4 +34,28 @@ pub trait RocksDBProviderFactory { #[cfg(not(all(unix, feature = "rocksdb")))] f(()) } + + /// Executes a closure with a `RocksDB` batch, automatically registering it for commit. + /// + /// This helper encapsulates all the cfg-gated `RocksDB` batch handling. + fn with_rocksdb_batch(&self, f: F) -> ProviderResult + where + F: FnOnce(RocksBatchArg<'_>) -> ProviderResult<(R, Option)>, + { + #[cfg(all(unix, feature = "rocksdb"))] + { + let rocksdb = self.rocksdb_provider(); + let batch = rocksdb.batch(); + let (result, raw_batch) = f(batch)?; + if let Some(b) = raw_batch { + self.set_pending_rocksdb_batch(b); + } + Ok(result) + } + #[cfg(not(all(unix, feature = "rocksdb")))] + { + let (result, _) = f(())?; + Ok(result) + } + } }