From 13c32625bc368f53090b72966f4833870e508d5f Mon Sep 17 00:00:00 2001 From: YK Date: Sat, 17 Jan 2026 01:44:43 +0800 Subject: [PATCH] feat(storage): add EitherReader for routing history queries to MDBX or RocksDB (#21063) Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com> --- .../src/providers/database/provider.rs | 21 +- crates/storage/provider/src/providers/mod.rs | 4 +- .../src/providers/rocksdb/invariants.rs | 170 +++++++-- .../src/providers/rocksdb/provider.rs | 358 +++++++++++++++--- .../src/providers/state/historical.rs | 144 ++++--- .../provider/src/traits/rocksdb_provider.rs | 20 +- 6 files changed, 555 insertions(+), 162 deletions(-) diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 3ec5bd28cb..af644a47a9 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -16,7 +16,7 @@ use crate::{ HeaderSyncGapProvider, HistoricalStateProvider, HistoricalStateProviderRef, HistoryWriter, LatestStateProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError, PruneCheckpointReader, PruneCheckpointWriter, RawRocksDBBatch, RevertsInit, RocksBatchArg, - RocksDBProviderFactory, RocksTxRefArg, StageCheckpointReader, StateProviderBox, StateWriter, + RocksDBProviderFactory, StageCheckpointReader, StateProviderBox, StateWriter, StaticFileProviderFactory, StatsReader, StorageReader, StorageTrieWriter, TransactionVariant, TransactionsProvider, TransactionsProviderExt, TrieWriter, }; @@ -889,25 +889,6 @@ impl DatabaseProvider { pub fn chain_spec(&self) -> &N::ChainSpec { &self.chain_spec } - - /// Executes a closure with a `RocksDB` transaction for reading. - /// - /// This helper encapsulates all the cfg-gated `RocksDB` transaction handling for reads. - fn with_rocksdb_tx(&self, f: F) -> ProviderResult - where - F: FnOnce(RocksTxRefArg<'_>) -> ProviderResult, - { - #[cfg(all(unix, feature = "rocksdb"))] - let rocksdb = self.rocksdb_provider(); - #[cfg(all(unix, feature = "rocksdb"))] - let rocksdb_tx = rocksdb.tx(); - #[cfg(all(unix, feature = "rocksdb"))] - let rocksdb_tx_ref = &rocksdb_tx; - #[cfg(not(all(unix, feature = "rocksdb")))] - let rocksdb_tx_ref = (); - - f(rocksdb_tx_ref) - } } impl DatabaseProvider { diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index 14f112a27b..1047e58c06 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -16,8 +16,8 @@ pub use static_file::{ mod state; pub use state::{ historical::{ - history_info, needs_prev_shard_check, HistoricalStateProvider, HistoricalStateProviderRef, - HistoryInfo, LowestAvailableBlocks, + compute_history_rank, history_info, needs_prev_shard_check, HistoricalStateProvider, + HistoricalStateProviderRef, HistoryInfo, LowestAvailableBlocks, }, latest::{LatestStateProvider, LatestStateProviderRef}, overlay::{OverlayStateProvider, OverlayStateProviderFactory}, diff --git a/crates/storage/provider/src/providers/rocksdb/invariants.rs b/crates/storage/provider/src/providers/rocksdb/invariants.rs index 7a5c5f9db3..75be8ca5ad 100644 --- a/crates/storage/provider/src/providers/rocksdb/invariants.rs +++ b/crates/storage/provider/src/providers/rocksdb/invariants.rs @@ -164,16 +164,7 @@ impl RocksDBProvider { self.prune_transaction_hash_numbers_in_range(provider, 0..=highest_tx)?; } (None, None) => { - // Both MDBX and static files are empty. - // If checkpoint says we should have data, that's an inconsistency. - if checkpoint > 0 { - tracing::warn!( - target: "reth::providers::rocksdb", - checkpoint, - "Checkpoint set but no transaction data exists, unwind needed" - ); - return Ok(Some(0)); - } + // Both MDBX and static files are empty, nothing to check. } } @@ -263,16 +254,27 @@ impl RocksDBProvider { } // Find the max highest_block_number (excluding u64::MAX sentinel) across all - // entries + // entries. Also track if we found any non-sentinel entries. let mut max_highest_block = 0u64; + let mut found_non_sentinel = false; for result in self.iter::()? { let (key, _) = result?; let highest = key.sharded_key.highest_block_number; - if highest != u64::MAX && highest > max_highest_block { - max_highest_block = highest; + if highest != u64::MAX { + found_non_sentinel = true; + if highest > max_highest_block { + max_highest_block = highest; + } } } + // If all entries are sentinel entries (u64::MAX), treat as first-run scenario. + // This means no completed shards exist (only sentinel shards with + // highest_block_number=u64::MAX), so no actual history has been indexed. + if !found_non_sentinel { + return Ok(None); + } + // If any entry has highest_block > checkpoint, prune excess if max_highest_block > checkpoint { tracing::info!( @@ -296,11 +298,7 @@ impl RocksDBProvider { Ok(None) } None => { - // Empty RocksDB table - if checkpoint > 0 { - // Stage says we should have data but we don't - return Ok(Some(0)); - } + // Empty RocksDB table, nothing to check. Ok(None) } } @@ -377,16 +375,27 @@ impl RocksDBProvider { } // Find the max highest_block_number (excluding u64::MAX sentinel) across all - // entries + // entries. Also track if we found any non-sentinel entries. let mut max_highest_block = 0u64; + let mut found_non_sentinel = false; for result in self.iter::()? { let (key, _) = result?; let highest = key.highest_block_number; - if highest != u64::MAX && highest > max_highest_block { - max_highest_block = highest; + if highest != u64::MAX { + found_non_sentinel = true; + if highest > max_highest_block { + max_highest_block = highest; + } } } + // If all entries are sentinel entries (u64::MAX), treat as first-run scenario. + // This means no completed shards exist (only sentinel shards with + // highest_block_number=u64::MAX), so no actual history has been indexed. + if !found_non_sentinel { + return Ok(None); + } + // If any entry has highest_block > checkpoint, prune excess if max_highest_block > checkpoint { tracing::info!( @@ -413,11 +422,7 @@ impl RocksDBProvider { Ok(None) } None => { - // Empty RocksDB table - if checkpoint > 0 { - // Stage says we should have data but we don't - return Ok(Some(0)); - } + // Empty RocksDB table, nothing to check. Ok(None) } } @@ -542,7 +547,7 @@ mod tests { } #[test] - fn test_check_consistency_empty_rocksdb_with_checkpoint_needs_unwind() { + fn test_check_consistency_empty_rocksdb_with_checkpoint_is_first_run() { let temp_dir = TempDir::new().unwrap(); let rocksdb = RocksDBBuilder::new(temp_dir.path()) .with_table::() @@ -566,10 +571,10 @@ mod tests { let provider = factory.database_provider_ro().unwrap(); - // RocksDB is empty but checkpoint says block 100 was processed - // This means RocksDB is missing data and we need to unwind to rebuild + // RocksDB is empty but checkpoint says block 100 was processed. + // This is treated as a first-run/migration scenario - no unwind needed. let result = rocksdb.check_consistency(&provider).unwrap(); - assert_eq!(result, Some(0), "Should require unwind to block 0 to rebuild RocksDB"); + assert_eq!(result, None, "Empty data with checkpoint is treated as first run"); } #[test] @@ -650,7 +655,7 @@ mod tests { } #[test] - fn test_check_consistency_storages_history_empty_with_checkpoint_needs_unwind() { + fn test_check_consistency_storages_history_empty_with_checkpoint_is_first_run() { let temp_dir = TempDir::new().unwrap(); let rocksdb = RocksDBBuilder::new(temp_dir.path()) .with_table::() @@ -674,9 +679,10 @@ mod tests { let provider = factory.database_provider_ro().unwrap(); - // RocksDB is empty but checkpoint says block 100 was processed + // RocksDB is empty but checkpoint says block 100 was processed. + // This is treated as a first-run/migration scenario - no unwind needed. let result = rocksdb.check_consistency(&provider).unwrap(); - assert_eq!(result, Some(0), "Should require unwind to block 0 to rebuild StoragesHistory"); + assert_eq!(result, None, "Empty RocksDB with checkpoint is treated as first run"); } #[test] @@ -978,6 +984,97 @@ mod tests { ); } + #[test] + fn test_check_consistency_storages_history_sentinel_only_with_checkpoint_is_first_run() { + let temp_dir = TempDir::new().unwrap(); + let rocksdb = RocksDBBuilder::new(temp_dir.path()) + .with_table::() + .build() + .unwrap(); + + // Insert ONLY sentinel entries (highest_block_number = u64::MAX) + // This simulates a scenario where history tracking started but no shards were completed + let key_sentinel_1 = StorageShardedKey::new(Address::ZERO, B256::ZERO, u64::MAX); + let key_sentinel_2 = StorageShardedKey::new(Address::random(), B256::random(), u64::MAX); + let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]); + rocksdb.put::(key_sentinel_1, &block_list).unwrap(); + rocksdb.put::(key_sentinel_2, &block_list).unwrap(); + + // Verify entries exist (not empty table) + assert!(rocksdb.first::().unwrap().is_some()); + + // Create a test provider factory for MDBX + let factory = create_test_provider_factory(); + factory.set_storage_settings_cache( + StorageSettings::legacy().with_storages_history_in_rocksdb(true), + ); + + // Set a checkpoint indicating we should have processed up to block 100 + { + let provider = factory.database_provider_rw().unwrap(); + provider + .save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100)) + .unwrap(); + provider.commit().unwrap(); + } + + let provider = factory.database_provider_ro().unwrap(); + + // RocksDB has only sentinel entries (no completed shards) but checkpoint is set. + // This is treated as a first-run/migration scenario - no unwind needed. + let result = rocksdb.check_consistency(&provider).unwrap(); + assert_eq!( + result, None, + "Sentinel-only entries with checkpoint should be treated as first run" + ); + } + + #[test] + fn test_check_consistency_accounts_history_sentinel_only_with_checkpoint_is_first_run() { + use reth_db_api::models::ShardedKey; + + let temp_dir = TempDir::new().unwrap(); + let rocksdb = RocksDBBuilder::new(temp_dir.path()) + .with_table::() + .build() + .unwrap(); + + // Insert ONLY sentinel entries (highest_block_number = u64::MAX) + let key_sentinel_1 = ShardedKey::new(Address::ZERO, u64::MAX); + let key_sentinel_2 = ShardedKey::new(Address::random(), u64::MAX); + let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]); + rocksdb.put::(key_sentinel_1, &block_list).unwrap(); + rocksdb.put::(key_sentinel_2, &block_list).unwrap(); + + // Verify entries exist (not empty table) + assert!(rocksdb.first::().unwrap().is_some()); + + // Create a test provider factory for MDBX + let factory = create_test_provider_factory(); + factory.set_storage_settings_cache( + StorageSettings::legacy().with_account_history_in_rocksdb(true), + ); + + // Set a checkpoint indicating we should have processed up to block 100 + { + let provider = factory.database_provider_rw().unwrap(); + provider + .save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100)) + .unwrap(); + provider.commit().unwrap(); + } + + let provider = factory.database_provider_ro().unwrap(); + + // RocksDB has only sentinel entries (no completed shards) but checkpoint is set. + // This is treated as a first-run/migration scenario - no unwind needed. + let result = rocksdb.check_consistency(&provider).unwrap(); + assert_eq!( + result, None, + "Sentinel-only entries with checkpoint should be treated as first run" + ); + } + #[test] fn test_check_consistency_storages_history_behind_checkpoint_single_entry() { use reth_db_api::models::storage_sharded_key::StorageShardedKey; @@ -1135,7 +1232,7 @@ mod tests { } #[test] - fn test_check_consistency_accounts_history_empty_with_checkpoint_needs_unwind() { + fn test_check_consistency_accounts_history_empty_with_checkpoint_is_first_run() { let temp_dir = TempDir::new().unwrap(); let rocksdb = RocksDBBuilder::new(temp_dir.path()) .with_table::() @@ -1159,9 +1256,10 @@ mod tests { let provider = factory.database_provider_ro().unwrap(); - // RocksDB is empty but checkpoint says block 100 was processed + // RocksDB is empty but checkpoint says block 100 was processed. + // This is treated as a first-run/migration scenario - no unwind needed. let result = rocksdb.check_consistency(&provider).unwrap(); - assert_eq!(result, Some(0), "Should require unwind to block 0 to rebuild AccountsHistory"); + assert_eq!(result, None, "Empty RocksDB with checkpoint is treated as first run"); } #[test] diff --git a/crates/storage/provider/src/providers/rocksdb/provider.rs b/crates/storage/provider/src/providers/rocksdb/provider.rs index 626e73ad05..cc427fcb8b 100644 --- a/crates/storage/provider/src/providers/rocksdb/provider.rs +++ b/crates/storage/provider/src/providers/rocksdb/provider.rs @@ -1,11 +1,15 @@ use super::metrics::{RocksDBMetrics, RocksDBOperation}; -use crate::providers::{needs_prev_shard_check, HistoryInfo}; +use crate::providers::{compute_history_rank, needs_prev_shard_check, HistoryInfo}; use alloy_consensus::transaction::TxHashRef; use alloy_primitives::{Address, BlockNumber, TxNumber, B256}; +use itertools::Itertools; use parking_lot::Mutex; use reth_chain_state::ExecutedBlock; use reth_db_api::{ - models::{storage_sharded_key::StorageShardedKey, ShardedKey, StorageSettings}, + models::{ + sharded_key::NUM_OF_INDICES_IN_SHARD, storage_sharded_key::StorageShardedKey, ShardedKey, + StorageSettings, + }, table::{Compress, Decode, Decompress, Encode, Table}, tables, BlockNumberList, DatabaseError, }; @@ -592,10 +596,10 @@ impl RocksDBProvider { account_history.entry(address).or_default().push(block_number); } } - for (address, blocks) in account_history { - let key = ShardedKey::new(address, u64::MAX); - let value = BlockNumberList::new_pre_sorted(blocks); - batch.put::(key, &value)?; + + // Write account history using proper shard append logic + for (address, indices) in account_history { + batch.append_account_history_shard(address, indices)?; } ctx.pending_batches.lock().push(batch.into_inner()); Ok(()) @@ -620,10 +624,10 @@ impl RocksDBProvider { } } } - for ((address, slot), blocks) in storage_history { - let key = StorageShardedKey::new(address, slot, u64::MAX); - let value = BlockNumberList::new_pre_sorted(blocks); - batch.put::(key, &value)?; + + // Write storage history using proper shard append logic + for ((address, slot), indices) in storage_history { + batch.append_storage_history_shard(address, slot, indices)?; } ctx.pending_batches.lock().push(batch.into_inner()); Ok(()) @@ -714,6 +718,129 @@ impl<'a> RocksDBBatch<'a> { pub fn into_inner(self) -> WriteBatchWithTransaction { self.inner } + + /// Appends indices to an account history shard with proper shard management. + /// + /// Loads the existing shard (if any), appends new indices, and rechunks into + /// multiple shards if needed (respecting `NUM_OF_INDICES_IN_SHARD` limit). + /// + /// # Requirements + /// + /// - The `indices` MUST be strictly increasing and contain no duplicates. + /// - This method MUST only be called once per address per batch. The batch reads existing + /// shards from committed DB state, not from pending writes. Calling twice for the same + /// address will cause the second call to overwrite the first. + pub fn append_account_history_shard( + &mut self, + address: Address, + indices: impl IntoIterator, + ) -> ProviderResult<()> { + let indices: Vec = indices.into_iter().collect(); + + if indices.is_empty() { + return Ok(()); + } + + debug_assert!( + indices.windows(2).all(|w| w[0] < w[1]), + "indices must be strictly increasing: {:?}", + indices + ); + + let last_key = ShardedKey::new(address, u64::MAX); + let last_shard_opt = self.provider.get::(last_key.clone())?; + let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty); + + last_shard.append(indices).map_err(ProviderError::other)?; + + // Fast path: all indices fit in one shard + if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 { + self.put::(last_key, &last_shard)?; + return Ok(()); + } + + // Slow path: rechunk into multiple shards + let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD); + let mut chunks_peekable = chunks.into_iter().peekable(); + + while let Some(chunk) = chunks_peekable.next() { + let shard = BlockNumberList::new_pre_sorted(chunk); + let highest_block_number = if chunks_peekable.peek().is_some() { + shard.iter().next_back().expect("`chunks` does not return empty list") + } else { + u64::MAX + }; + + self.put::( + ShardedKey::new(address, highest_block_number), + &shard, + )?; + } + + Ok(()) + } + + /// Appends indices to a storage history shard with proper shard management. + /// + /// Loads the existing shard (if any), appends new indices, and rechunks into + /// multiple shards if needed (respecting `NUM_OF_INDICES_IN_SHARD` limit). + /// + /// # Requirements + /// + /// - The `indices` MUST be strictly increasing and contain no duplicates. + /// - This method MUST only be called once per (address, `storage_key`) pair per batch. The + /// batch reads existing shards from committed DB state, not from pending writes. Calling + /// twice for the same key will cause the second call to overwrite the first. + pub fn append_storage_history_shard( + &mut self, + address: Address, + storage_key: B256, + indices: impl IntoIterator, + ) -> ProviderResult<()> { + let indices: Vec = indices.into_iter().collect(); + + if indices.is_empty() { + return Ok(()); + } + + debug_assert!( + indices.windows(2).all(|w| w[0] < w[1]), + "indices must be strictly increasing: {:?}", + indices + ); + + let last_key = StorageShardedKey::last(address, storage_key); + let last_shard_opt = self.provider.get::(last_key.clone())?; + let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty); + + last_shard.append(indices).map_err(ProviderError::other)?; + + // Fast path: all indices fit in one shard + if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 { + self.put::(last_key, &last_shard)?; + return Ok(()); + } + + // Slow path: rechunk into multiple shards + let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD); + let mut chunks_peekable = chunks.into_iter().peekable(); + + while let Some(chunk) = chunks_peekable.next() { + let shard = BlockNumberList::new_pre_sorted(chunk); + let highest_block_number = if chunks_peekable.peek().is_some() { + shard.iter().next_back().expect("`chunks` does not return empty list") + } else { + u64::MAX + }; + + self.put::( + StorageShardedKey::new(address, storage_key, highest_block_number), + &shard, + )?; + } + + Ok(()) + } } /// `RocksDB` transaction wrapper providing MDBX-like semantics. @@ -901,6 +1028,16 @@ impl<'db> RocksTx<'db> { where T: Table, { + // History may be pruned if a lowest available block is set. + let is_maybe_pruned = lowest_available_block_number.is_some(); + let fallback = || { + Ok(if is_maybe_pruned { + HistoryInfo::MaybeInPlainState + } else { + HistoryInfo::NotYetWritten + }) + }; + let cf = self.provider.0.db.cf_handle(T::NAME).ok_or_else(|| { ProviderError::Database(DatabaseError::Other(format!( "column family not found: {}", @@ -918,53 +1055,28 @@ impl<'db> RocksTx<'db> { if !iter.valid() { // No shard found at or after target block. - return if lowest_available_block_number.is_some() { - // The key may have been written, but due to pruning we may not have changesets - // and history, so we need to make a plain state lookup. - Ok(HistoryInfo::MaybeInPlainState) - } else { - // The key has not been written to at all. - Ok(HistoryInfo::NotYetWritten) - }; + // + // (MaybeInPlainState) The key may have been written, but due to pruning we may not have + // changesets and history, so we need to make a plain state lookup. + // (HistoryInfo::NotYetWritten) The key has not been written to at all. + return fallback(); } // Check if the found key matches our target entity. let Some(key_bytes) = iter.key() else { - return if lowest_available_block_number.is_some() { - Ok(HistoryInfo::MaybeInPlainState) - } else { - Ok(HistoryInfo::NotYetWritten) - }; + return fallback(); }; if !key_matches(key_bytes)? { // The found key is for a different entity. - return if lowest_available_block_number.is_some() { - Ok(HistoryInfo::MaybeInPlainState) - } else { - Ok(HistoryInfo::NotYetWritten) - }; + return fallback(); } // Decompress the block list for this shard. let Some(value_bytes) = iter.value() else { - return if lowest_available_block_number.is_some() { - Ok(HistoryInfo::MaybeInPlainState) - } else { - Ok(HistoryInfo::NotYetWritten) - }; + return fallback(); }; let chunk = BlockNumberList::decompress(value_bytes)?; - - // Get the rank of the first entry before or equal to our block. - let mut rank = chunk.rank(block_number); - - // Adjust the rank, so that we have the rank of the first entry strictly before our - // block (not equal to it). - if rank.checked_sub(1).and_then(|r| chunk.select(r)) == Some(block_number) { - rank -= 1; - } - - let found_block = chunk.select(rank); + let (rank, found_block) = compute_history_rank(&chunk, block_number); // Lazy check for previous shard - only called when needed. // If we can step to a previous shard for this same key, history already exists, @@ -1103,7 +1215,11 @@ mod tests { use crate::providers::HistoryInfo; use alloy_primitives::{Address, TxHash, B256}; use reth_db_api::{ - models::{sharded_key::ShardedKey, storage_sharded_key::StorageShardedKey, IntegerList}, + models::{ + sharded_key::{ShardedKey, NUM_OF_INDICES_IN_SHARD}, + storage_sharded_key::StorageShardedKey, + IntegerList, + }, table::Table, tables, }; @@ -1452,4 +1568,156 @@ mod tests { tx.rollback().unwrap(); } + + #[test] + fn test_account_history_shard_split_at_boundary() { + let temp_dir = TempDir::new().unwrap(); + let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap(); + + let address = Address::from([0x42; 20]); + let limit = NUM_OF_INDICES_IN_SHARD; + + // Add exactly NUM_OF_INDICES_IN_SHARD + 1 indices to trigger a split + let indices: Vec = (0..=(limit as u64)).collect(); + let mut batch = provider.batch(); + batch.append_account_history_shard(address, indices).unwrap(); + batch.commit().unwrap(); + + // Should have 2 shards: one completed shard and one sentinel shard + let completed_key = ShardedKey::new(address, (limit - 1) as u64); + let sentinel_key = ShardedKey::new(address, u64::MAX); + + let completed_shard = provider.get::(completed_key).unwrap(); + let sentinel_shard = provider.get::(sentinel_key).unwrap(); + + assert!(completed_shard.is_some(), "completed shard should exist"); + assert!(sentinel_shard.is_some(), "sentinel shard should exist"); + + let completed_shard = completed_shard.unwrap(); + let sentinel_shard = sentinel_shard.unwrap(); + + assert_eq!(completed_shard.len(), limit as u64, "completed shard should be full"); + assert_eq!(sentinel_shard.len(), 1, "sentinel shard should have 1 element"); + } + + #[test] + fn test_account_history_multiple_shard_splits() { + let temp_dir = TempDir::new().unwrap(); + let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap(); + + let address = Address::from([0x43; 20]); + let limit = NUM_OF_INDICES_IN_SHARD; + + // First batch: add NUM_OF_INDICES_IN_SHARD indices + let first_batch_indices: Vec = (0..limit as u64).collect(); + let mut batch = provider.batch(); + batch.append_account_history_shard(address, first_batch_indices).unwrap(); + batch.commit().unwrap(); + + // Should have just a sentinel shard (exactly at limit, not over) + let sentinel_key = ShardedKey::new(address, u64::MAX); + let shard = provider.get::(sentinel_key.clone()).unwrap(); + assert!(shard.is_some()); + assert_eq!(shard.unwrap().len(), limit as u64); + + // Second batch: add another NUM_OF_INDICES_IN_SHARD + 1 indices (causing 2 more shards) + let second_batch_indices: Vec = (limit as u64..=(2 * limit) as u64).collect(); + let mut batch = provider.batch(); + batch.append_account_history_shard(address, second_batch_indices).unwrap(); + batch.commit().unwrap(); + + // Now we should have: 2 completed shards + 1 sentinel shard + let first_completed = ShardedKey::new(address, (limit - 1) as u64); + let second_completed = ShardedKey::new(address, (2 * limit - 1) as u64); + + assert!( + provider.get::(first_completed).unwrap().is_some(), + "first completed shard should exist" + ); + assert!( + provider.get::(second_completed).unwrap().is_some(), + "second completed shard should exist" + ); + assert!( + provider.get::(sentinel_key).unwrap().is_some(), + "sentinel shard should exist" + ); + } + + #[test] + fn test_storage_history_shard_split_at_boundary() { + let temp_dir = TempDir::new().unwrap(); + let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap(); + + let address = Address::from([0x44; 20]); + let slot = B256::from([0x55; 32]); + let limit = NUM_OF_INDICES_IN_SHARD; + + // Add exactly NUM_OF_INDICES_IN_SHARD + 1 indices to trigger a split + let indices: Vec = (0..=(limit as u64)).collect(); + let mut batch = provider.batch(); + batch.append_storage_history_shard(address, slot, indices).unwrap(); + batch.commit().unwrap(); + + // Should have 2 shards: one completed shard and one sentinel shard + let completed_key = StorageShardedKey::new(address, slot, (limit - 1) as u64); + let sentinel_key = StorageShardedKey::new(address, slot, u64::MAX); + + let completed_shard = provider.get::(completed_key).unwrap(); + let sentinel_shard = provider.get::(sentinel_key).unwrap(); + + assert!(completed_shard.is_some(), "completed shard should exist"); + assert!(sentinel_shard.is_some(), "sentinel shard should exist"); + + let completed_shard = completed_shard.unwrap(); + let sentinel_shard = sentinel_shard.unwrap(); + + assert_eq!(completed_shard.len(), limit as u64, "completed shard should be full"); + assert_eq!(sentinel_shard.len(), 1, "sentinel shard should have 1 element"); + } + + #[test] + fn test_storage_history_multiple_shard_splits() { + let temp_dir = TempDir::new().unwrap(); + let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap(); + + let address = Address::from([0x46; 20]); + let slot = B256::from([0x57; 32]); + let limit = NUM_OF_INDICES_IN_SHARD; + + // First batch: add NUM_OF_INDICES_IN_SHARD indices + let first_batch_indices: Vec = (0..limit as u64).collect(); + let mut batch = provider.batch(); + batch.append_storage_history_shard(address, slot, first_batch_indices).unwrap(); + batch.commit().unwrap(); + + // Should have just a sentinel shard (exactly at limit, not over) + let sentinel_key = StorageShardedKey::new(address, slot, u64::MAX); + let shard = provider.get::(sentinel_key.clone()).unwrap(); + assert!(shard.is_some()); + assert_eq!(shard.unwrap().len(), limit as u64); + + // Second batch: add another NUM_OF_INDICES_IN_SHARD + 1 indices (causing 2 more shards) + let second_batch_indices: Vec = (limit as u64..=(2 * limit) as u64).collect(); + let mut batch = provider.batch(); + batch.append_storage_history_shard(address, slot, second_batch_indices).unwrap(); + batch.commit().unwrap(); + + // Now we should have: 2 completed shards + 1 sentinel shard + let first_completed = StorageShardedKey::new(address, slot, (limit - 1) as u64); + let second_completed = StorageShardedKey::new(address, slot, (2 * limit - 1) as u64); + + assert!( + provider.get::(first_completed).unwrap().is_some(), + "first completed shard should exist" + ); + assert!( + provider.get::(second_completed).unwrap().is_some(), + "second completed shard should exist" + ); + assert!( + provider.get::(sentinel_key).unwrap().is_some(), + "sentinel shard should exist" + ); + } } diff --git a/crates/storage/provider/src/providers/state/historical.rs b/crates/storage/provider/src/providers/state/historical.rs index f9bc61c7eb..62c7489819 100644 --- a/crates/storage/provider/src/providers/state/historical.rs +++ b/crates/storage/provider/src/providers/state/historical.rs @@ -1,12 +1,11 @@ use crate::{ - AccountReader, BlockHashReader, ChangeSetReader, HashedPostStateProvider, ProviderError, - StateProvider, StateRootProvider, + AccountReader, BlockHashReader, ChangeSetReader, EitherReader, HashedPostStateProvider, + ProviderError, RocksDBProviderFactory, StateProvider, StateRootProvider, }; use alloy_eips::merge::EPOCH_SLOTS; use alloy_primitives::{Address, BlockNumber, Bytes, StorageKey, StorageValue, B256}; use reth_db_api::{ cursor::{DbCursorRO, DbDupCursorRO}, - models::{storage_sharded_key::StorageShardedKey, ShardedKey}, table::Table, tables, transaction::DbTx, @@ -14,7 +13,8 @@ use reth_db_api::{ }; use reth_primitives_traits::{Account, Bytecode}; use reth_storage_api::{ - BlockNumReader, BytecodeReader, DBProvider, StateProofProvider, StorageRootProvider, + BlockNumReader, BytecodeReader, DBProvider, NodePrimitivesProvider, StateProofProvider, + StorageRootProvider, StorageSettingsCache, }; use reth_storage_errors::provider::ProviderResult; use reth_trie::{ @@ -127,38 +127,47 @@ impl<'b, Provider: DBProvider + ChangeSetReader + BlockNumReader> Self { provider, block_number, lowest_available_blocks } } - /// Lookup an account in the `AccountsHistory` table - pub fn account_history_lookup(&self, address: Address) -> ProviderResult { + /// Lookup an account in the `AccountsHistory` table using `EitherReader`. + pub fn account_history_lookup(&self, address: Address) -> ProviderResult + where + Provider: StorageSettingsCache + RocksDBProviderFactory + NodePrimitivesProvider, + { if !self.lowest_available_blocks.is_account_history_available(self.block_number) { return Err(ProviderError::StateAtBlockPruned(self.block_number)) } - // history key to search IntegerList of block number changesets. - let history_key = ShardedKey::new(address, self.block_number); - self.history_info_lookup::( - history_key, - |key| key.key == address, - self.lowest_available_blocks.account_history_block_number, - ) + self.provider.with_rocksdb_tx(|rocks_tx_ref| { + let mut reader = EitherReader::new_accounts_history(self.provider, rocks_tx_ref)?; + reader.account_history_info( + address, + self.block_number, + self.lowest_available_blocks.account_history_block_number, + ) + }) } - /// Lookup a storage key in the `StoragesHistory` table + /// Lookup a storage key in the `StoragesHistory` table using `EitherReader`. pub fn storage_history_lookup( &self, address: Address, storage_key: StorageKey, - ) -> ProviderResult { + ) -> ProviderResult + where + Provider: StorageSettingsCache + RocksDBProviderFactory + NodePrimitivesProvider, + { if !self.lowest_available_blocks.is_storage_history_available(self.block_number) { return Err(ProviderError::StateAtBlockPruned(self.block_number)) } - // history key to search IntegerList of block number changesets. - let history_key = StorageShardedKey::new(address, storage_key, self.block_number); - self.history_info_lookup::( - history_key, - |key| key.address == address && key.sharded_key.key == storage_key, - self.lowest_available_blocks.storage_history_block_number, - ) + self.provider.with_rocksdb_tx(|rocks_tx_ref| { + let mut reader = EitherReader::new_storages_history(self.provider, rocks_tx_ref)?; + reader.storage_history_info( + address, + storage_key, + self.block_number, + self.lowest_available_blocks.storage_history_block_number, + ) + }) } /// Checks and returns `true` if distance to historical block exceeds the provided limit. @@ -204,25 +213,6 @@ impl<'b, Provider: DBProvider + ChangeSetReader + BlockNumReader> Ok(HashedStorage::from_reverts(self.tx(), address, self.block_number)?) } - fn history_info_lookup( - &self, - key: K, - key_filter: impl Fn(&K) -> bool, - lowest_available_block_number: Option, - ) -> ProviderResult - where - T: Table, - { - let mut cursor = self.tx().cursor_read::()?; - history_info::( - &mut cursor, - key, - self.block_number, - key_filter, - lowest_available_block_number, - ) - } - /// Set the lowest block number at which the account history is available. pub const fn with_lowest_available_account_history_block_number( mut self, @@ -248,8 +238,14 @@ impl HistoricalStateProviderRef<'_, Provi } } -impl AccountReader - for HistoricalStateProviderRef<'_, Provider> +impl< + Provider: DBProvider + + BlockNumReader + + ChangeSetReader + + StorageSettingsCache + + RocksDBProviderFactory + + NodePrimitivesProvider, + > AccountReader for HistoricalStateProviderRef<'_, Provider> { /// Get basic account information. fn basic_account(&self, address: &Address) -> ProviderResult> { @@ -404,8 +400,15 @@ impl HashedPostStateProvider for HistoricalStateProviderRef<'_, Provid } } -impl StateProvider - for HistoricalStateProviderRef<'_, Provider> +impl< + Provider: DBProvider + + BlockNumReader + + BlockHashReader + + ChangeSetReader + + StorageSettingsCache + + RocksDBProviderFactory + + NodePrimitivesProvider, + > StateProvider for HistoricalStateProviderRef<'_, Provider> { /// Get storage. fn storage( @@ -495,7 +498,7 @@ impl HistoricalStatePro } // Delegates all provider impls to [HistoricalStateProviderRef] -reth_storage_api::macros::delegate_provider_impls!(HistoricalStateProvider where [Provider: DBProvider + BlockNumReader + BlockHashReader + ChangeSetReader]); +reth_storage_api::macros::delegate_provider_impls!(HistoricalStateProvider where [Provider: DBProvider + BlockNumReader + BlockHashReader + ChangeSetReader + StorageSettingsCache + RocksDBProviderFactory + NodePrimitivesProvider]); /// Lowest blocks at which different parts of the state are available. /// They may be [Some] if pruning is enabled. @@ -525,6 +528,32 @@ impl LowestAvailableBlocks { } } +/// Computes the rank and finds the next modification block in a history shard. +/// +/// Given a `block_number`, this function returns: +/// - `rank`: The number of entries strictly before `block_number` in the shard +/// - `found_block`: The block number at position `rank` (i.e., the first block >= `block_number` +/// where a modification occurred), or `None` if `rank` is out of bounds +/// +/// The rank is adjusted when `block_number` exactly matches an entry in the shard, +/// so that `found_block` always returns the modification at or after the target. +/// +/// This logic is shared between MDBX cursor-based lookups and `RocksDB` iterator lookups. +#[inline] +pub fn compute_history_rank( + chunk: &reth_db_api::BlockNumberList, + block_number: BlockNumber, +) -> (u64, Option) { + let mut rank = chunk.rank(block_number); + // `rank(block_number)` returns count of entries <= block_number. + // We want the first entry >= block_number, so if block_number is in the shard, + // we need to step back one position to point at it (not past it). + if rank.checked_sub(1).and_then(|r| chunk.select(r)) == Some(block_number) { + rank -= 1; + } + (rank, chunk.select(rank)) +} + /// Checks if a previous shard lookup is needed to determine if we're before the first write. /// /// Returns `true` when `rank == 0` (first entry in shard) and the found block doesn't match @@ -557,16 +586,7 @@ where // index, the first chunk for the next key will be returned so we filter out chunks that // have a different key. if let Some(chunk) = cursor.seek(key)?.filter(|(k, _)| key_filter(k)).map(|x| x.1) { - // Get the rank of the first entry before or equal to our block. - let mut rank = chunk.rank(block_number); - - // Adjust the rank, so that we have the rank of the first entry strictly before our - // block (not equal to it). - if rank.checked_sub(1).and_then(|r| chunk.select(r)) == Some(block_number) { - rank -= 1; - } - - let found_block = chunk.select(rank); + let (rank, found_block) = compute_history_rank(&chunk, block_number); // If our block is before the first entry in the index chunk and this first entry // doesn't equal to our block, it might be before the first write ever. To check, we @@ -598,7 +618,8 @@ mod tests { use crate::{ providers::state::historical::{HistoryInfo, LowestAvailableBlocks}, test_utils::create_test_provider_factory, - AccountReader, HistoricalStateProvider, HistoricalStateProviderRef, StateProvider, + AccountReader, HistoricalStateProvider, HistoricalStateProviderRef, RocksDBProviderFactory, + StateProvider, }; use alloy_primitives::{address, b256, Address, B256, U256}; use reth_db_api::{ @@ -610,6 +631,7 @@ mod tests { use reth_primitives_traits::{Account, StorageEntry}; use reth_storage_api::{ BlockHashReader, BlockNumReader, ChangeSetReader, DBProvider, DatabaseProviderFactory, + NodePrimitivesProvider, StorageSettingsCache, }; use reth_storage_errors::provider::ProviderError; @@ -621,7 +643,13 @@ mod tests { const fn assert_state_provider() {} #[expect(dead_code)] const fn assert_historical_state_provider< - T: DBProvider + BlockNumReader + BlockHashReader + ChangeSetReader, + T: DBProvider + + BlockNumReader + + BlockHashReader + + ChangeSetReader + + StorageSettingsCache + + RocksDBProviderFactory + + NodePrimitivesProvider, >() { assert_state_provider::>(); } diff --git a/crates/storage/provider/src/traits/rocksdb_provider.rs b/crates/storage/provider/src/traits/rocksdb_provider.rs index 9d2186677d..3394fa16f6 100644 --- a/crates/storage/provider/src/traits/rocksdb_provider.rs +++ b/crates/storage/provider/src/traits/rocksdb_provider.rs @@ -1,4 +1,5 @@ -use crate::providers::RocksDBProvider; +use crate::{either_writer::RocksTxRefArg, providers::RocksDBProvider}; +use reth_storage_errors::provider::ProviderResult; /// `RocksDB` provider factory. /// @@ -13,4 +14,21 @@ pub trait RocksDBProviderFactory { /// commits, ensuring atomicity across all storage backends. #[cfg(all(unix, feature = "rocksdb"))] fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction); + + /// Executes a closure with a `RocksDB` transaction for reading. + /// + /// This helper encapsulates all the cfg-gated `RocksDB` transaction handling for reads. + fn with_rocksdb_tx(&self, f: F) -> ProviderResult + where + F: FnOnce(RocksTxRefArg<'_>) -> ProviderResult, + { + #[cfg(all(unix, feature = "rocksdb"))] + { + let rocksdb = self.rocksdb_provider(); + let tx = rocksdb.tx(); + f(&tx) + } + #[cfg(not(all(unix, feature = "rocksdb")))] + f(()) + } }