mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-08 23:08:19 -05:00
feat(storage): add history lookup methods to EitherReader
Add `storage_history_info` and `account_history_info` methods to `EitherReader` that route history lookups to either MDBX cursors or RocksDB transactions based on storage settings. Changes: - Add `storage_history_info` method to EitherReader for StoragesHistory table - Add `account_history_info` method to EitherReader for AccountsHistory table - Add `cursor_history_info` helper function for MDBX cursor-based shard lookups - Export `find_changeset_block_from_index` from historical.rs for external use - Add 4 tests covering RocksDB routing and MDBX fallback scenarios This enables callers to perform history lookups through EitherReader without knowing whether the data is stored in MDBX or RocksDB. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -6,10 +6,10 @@ use std::{marker::PhantomData, ops::Range};
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
use crate::providers::rocksdb::RocksDBBatch;
|
||||
use crate::{
|
||||
providers::{StaticFileProvider, StaticFileProviderRWRefMut},
|
||||
providers::{HistoryInfo, StaticFileProvider, StaticFileProviderRWRefMut},
|
||||
StaticFileProviderFactory,
|
||||
};
|
||||
use alloy_primitives::{map::HashMap, Address, BlockNumber, TxHash, TxNumber};
|
||||
use alloy_primitives::{map::HashMap, Address, BlockNumber, StorageKey, TxHash, TxNumber};
|
||||
use reth_db::{
|
||||
cursor::DbCursorRO,
|
||||
static_file::TransactionSenderMask,
|
||||
@@ -587,6 +587,39 @@ where
|
||||
Self::RocksDB(tx) => tx.get::<tables::StoragesHistory>(key),
|
||||
}
|
||||
}
|
||||
|
||||
/// Lookup storage history and return [`HistoryInfo`] directly.
|
||||
///
|
||||
/// This performs the shard-walking logic to find which block contains the historical
|
||||
/// value for the given storage slot at the target block number.
|
||||
pub fn storage_history_info(
|
||||
&mut self,
|
||||
address: Address,
|
||||
storage_key: StorageKey,
|
||||
block_number: BlockNumber,
|
||||
lowest_available_block_number: Option<BlockNumber>,
|
||||
) -> ProviderResult<HistoryInfo> {
|
||||
match self {
|
||||
Self::Database(cursor, _) => {
|
||||
let key = StorageShardedKey::new(address, storage_key, block_number);
|
||||
cursor_history_info(
|
||||
cursor,
|
||||
key,
|
||||
|k| k.address == address && k.sharded_key.key == storage_key,
|
||||
block_number,
|
||||
lowest_available_block_number,
|
||||
)
|
||||
}
|
||||
Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(tx) => tx.storage_history_info(
|
||||
address,
|
||||
storage_key,
|
||||
block_number,
|
||||
lowest_available_block_number,
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
|
||||
@@ -605,6 +638,95 @@ where
|
||||
Self::RocksDB(tx) => tx.get::<tables::AccountsHistory>(key),
|
||||
}
|
||||
}
|
||||
|
||||
/// Lookup account history and return [`HistoryInfo`] directly.
|
||||
///
|
||||
/// This performs the shard-walking logic to find which block contains the historical
|
||||
/// value for the given account at the target block number.
|
||||
pub fn account_history_info(
|
||||
&mut self,
|
||||
address: Address,
|
||||
block_number: BlockNumber,
|
||||
lowest_available_block_number: Option<BlockNumber>,
|
||||
) -> ProviderResult<HistoryInfo> {
|
||||
match self {
|
||||
Self::Database(cursor, _) => {
|
||||
let key = ShardedKey::new(address, block_number);
|
||||
cursor_history_info(
|
||||
cursor,
|
||||
key,
|
||||
|k| k.key == address,
|
||||
block_number,
|
||||
lowest_available_block_number,
|
||||
)
|
||||
}
|
||||
Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(tx) => {
|
||||
tx.account_history_info(address, block_number, lowest_available_block_number)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Generic history lookup using a database cursor.
|
||||
///
|
||||
/// This implements the shard-walking logic for finding historical values in sharded tables
|
||||
/// like `AccountsHistory` and `StoragesHistory`.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `cursor` - Database cursor positioned at the start of the table
|
||||
/// * `key` - The sharded key to search for (includes target block number)
|
||||
/// * `key_filter` - Predicate to verify the found key belongs to the same entity
|
||||
/// * `block_number` - Target block number for the lookup
|
||||
/// * `lowest_available_block_number` - Pruning boundary (if history is pruned)
|
||||
fn cursor_history_info<T, K>(
|
||||
cursor: &mut impl DbCursorRO<T>,
|
||||
key: K,
|
||||
key_filter: impl Fn(&K) -> bool,
|
||||
block_number: BlockNumber,
|
||||
lowest_available_block_number: Option<BlockNumber>,
|
||||
) -> ProviderResult<HistoryInfo>
|
||||
where
|
||||
T: reth_db_api::table::Table<Key = K, Value = BlockNumberList>,
|
||||
{
|
||||
// Seek to the shard containing the target block. If the key doesn't exist,
|
||||
// the cursor will point to the next key, so we filter to ensure it's the same entity.
|
||||
if let Some(chunk) = cursor.seek(key)?.filter(|(k, _)| key_filter(k)).map(|x| x.1) {
|
||||
// Get the rank of the first entry before or equal to our block.
|
||||
let mut rank = chunk.rank(block_number);
|
||||
|
||||
// Adjust the rank, so that we have the rank of the first entry strictly before our
|
||||
// block (not equal to it).
|
||||
if rank.checked_sub(1).and_then(|r| chunk.select(r)) == Some(block_number) {
|
||||
rank -= 1;
|
||||
}
|
||||
|
||||
let found_block = chunk.select(rank);
|
||||
|
||||
// If our block is before the first entry in the index chunk and this first entry
|
||||
// doesn't equal to our block, it might be before the first write ever. To check, we
|
||||
// look at the previous entry and check if the key is the same.
|
||||
// This check is worth it, the `cursor.prev()` check is rarely triggered (the if will
|
||||
// short-circuit) and when it passes we save a full seek into the changeset/plain state
|
||||
// table.
|
||||
let is_before_first_write = rank == 0 &&
|
||||
found_block != Some(block_number) &&
|
||||
!cursor.prev()?.is_some_and(|(k, _)| key_filter(&k));
|
||||
|
||||
Ok(HistoryInfo::from_lookup(
|
||||
found_block,
|
||||
is_before_first_write,
|
||||
lowest_available_block_number,
|
||||
))
|
||||
} else if lowest_available_block_number.is_some() {
|
||||
// The key may have been written, but due to pruning we may not have changesets and
|
||||
// history, so we need to make a plain state lookup.
|
||||
Ok(HistoryInfo::MaybeInPlainState)
|
||||
} else {
|
||||
// The key has not been written to at all.
|
||||
Ok(HistoryInfo::NotYetWritten)
|
||||
}
|
||||
}
|
||||
|
||||
/// Destination for writing data.
|
||||
@@ -991,4 +1113,166 @@ mod rocksdb_tests {
|
||||
"Data should be visible after provider.commit()"
|
||||
);
|
||||
}
|
||||
|
||||
/// Test `EitherReader::storage_history_info` routes to `RocksDB` when configured.
|
||||
#[test]
|
||||
fn test_either_reader_storage_history_info_rocksdb() {
|
||||
use crate::providers::HistoryInfo;
|
||||
|
||||
let factory = create_test_provider_factory();
|
||||
|
||||
// Enable RocksDB for storage history
|
||||
factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
|
||||
);
|
||||
|
||||
let address = Address::from([0x42; 20]);
|
||||
let storage_key = B256::from([0x11; 32]);
|
||||
|
||||
// Create history shards in RocksDB
|
||||
let rocksdb = factory.rocksdb_provider();
|
||||
let chunk = IntegerList::new([100, 200, 300]).unwrap();
|
||||
let shard_key = StorageShardedKey::new(address, storage_key, u64::MAX);
|
||||
rocksdb.put::<tables::StoragesHistory>(shard_key, &chunk).unwrap();
|
||||
|
||||
// Create EitherReader with RocksDB
|
||||
let rocksdb_tx = rocksdb.tx();
|
||||
let provider = factory.database_provider_ro().unwrap();
|
||||
let mut reader = EitherReader::new_storages_history(&provider, &rocksdb_tx).unwrap();
|
||||
|
||||
// Verify we got a RocksDB reader
|
||||
assert!(matches!(reader, EitherReader::RocksDB(_)));
|
||||
|
||||
// Query for block 150: should find block 200 in changeset
|
||||
let result = reader.storage_history_info(address, storage_key, 150, None).unwrap();
|
||||
assert_eq!(result, HistoryInfo::InChangeset(200));
|
||||
|
||||
// Query for block 50: should return NotYetWritten (before first entry)
|
||||
let result = reader.storage_history_info(address, storage_key, 50, None).unwrap();
|
||||
assert_eq!(result, HistoryInfo::NotYetWritten);
|
||||
|
||||
// Query for block 500: should return InPlainState (after last entry)
|
||||
let result = reader.storage_history_info(address, storage_key, 500, None).unwrap();
|
||||
assert_eq!(result, HistoryInfo::InPlainState);
|
||||
}
|
||||
|
||||
/// Test `EitherReader::account_history_info` routes to `RocksDB` when configured.
|
||||
#[test]
|
||||
fn test_either_reader_account_history_info_rocksdb() {
|
||||
use crate::providers::HistoryInfo;
|
||||
|
||||
let factory = create_test_provider_factory();
|
||||
|
||||
// Enable RocksDB for account history
|
||||
factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_account_history_in_rocksdb(true),
|
||||
);
|
||||
|
||||
let address = Address::from([0x42; 20]);
|
||||
|
||||
// Create history shards in RocksDB
|
||||
let rocksdb = factory.rocksdb_provider();
|
||||
let chunk = IntegerList::new([100, 200, 300]).unwrap();
|
||||
let shard_key = ShardedKey::new(address, u64::MAX);
|
||||
rocksdb.put::<tables::AccountsHistory>(shard_key, &chunk).unwrap();
|
||||
|
||||
// Create EitherReader with RocksDB
|
||||
let rocksdb_tx = rocksdb.tx();
|
||||
let provider = factory.database_provider_ro().unwrap();
|
||||
let mut reader = EitherReader::new_accounts_history(&provider, &rocksdb_tx).unwrap();
|
||||
|
||||
// Verify we got a RocksDB reader
|
||||
assert!(matches!(reader, EitherReader::RocksDB(_)));
|
||||
|
||||
// Query for block 150: should find block 200 in changeset
|
||||
let result = reader.account_history_info(address, 150, None).unwrap();
|
||||
assert_eq!(result, HistoryInfo::InChangeset(200));
|
||||
|
||||
// Query for block 50: should return NotYetWritten (before first entry)
|
||||
let result = reader.account_history_info(address, 50, None).unwrap();
|
||||
assert_eq!(result, HistoryInfo::NotYetWritten);
|
||||
|
||||
// Query for block 500: should return InPlainState (after last entry)
|
||||
let result = reader.account_history_info(address, 500, None).unwrap();
|
||||
assert_eq!(result, HistoryInfo::InPlainState);
|
||||
}
|
||||
|
||||
/// Test `EitherReader::storage_history_info` falls back to MDBX when RocksDB is disabled.
|
||||
#[test]
|
||||
fn test_either_reader_storage_history_info_mdbx_fallback() {
|
||||
use crate::providers::HistoryInfo;
|
||||
|
||||
let factory = create_test_provider_factory();
|
||||
|
||||
// Keep RocksDB disabled for storage history (default)
|
||||
factory.set_storage_settings_cache(StorageSettings::legacy());
|
||||
|
||||
let address = Address::from([0x42; 20]);
|
||||
let storage_key = B256::from([0x11; 32]);
|
||||
|
||||
// Create history shards in MDBX
|
||||
{
|
||||
let provider = factory.database_provider_rw().unwrap();
|
||||
let chunk = IntegerList::new([100, 200, 300]).unwrap();
|
||||
let shard_key = StorageShardedKey::new(address, storage_key, u64::MAX);
|
||||
provider.tx_ref().put::<tables::StoragesHistory>(shard_key, chunk).unwrap();
|
||||
provider.commit().unwrap();
|
||||
}
|
||||
|
||||
// Create EitherReader - should use MDBX
|
||||
let rocksdb = factory.rocksdb_provider();
|
||||
let rocksdb_tx = rocksdb.tx();
|
||||
let provider = factory.database_provider_ro().unwrap();
|
||||
let mut reader = EitherReader::new_storages_history(&provider, &rocksdb_tx).unwrap();
|
||||
|
||||
// Verify we got a Database reader
|
||||
assert!(matches!(reader, EitherReader::Database(_, _)));
|
||||
|
||||
// Query for block 150: should find block 200 in changeset
|
||||
let result = reader.storage_history_info(address, storage_key, 150, None).unwrap();
|
||||
assert_eq!(result, HistoryInfo::InChangeset(200));
|
||||
|
||||
// Query for block 50: should return NotYetWritten
|
||||
let result = reader.storage_history_info(address, storage_key, 50, None).unwrap();
|
||||
assert_eq!(result, HistoryInfo::NotYetWritten);
|
||||
}
|
||||
|
||||
/// Test `EitherReader::account_history_info` falls back to MDBX when RocksDB is disabled.
|
||||
#[test]
|
||||
fn test_either_reader_account_history_info_mdbx_fallback() {
|
||||
use crate::providers::HistoryInfo;
|
||||
|
||||
let factory = create_test_provider_factory();
|
||||
|
||||
// Keep RocksDB disabled for account history (default)
|
||||
factory.set_storage_settings_cache(StorageSettings::legacy());
|
||||
|
||||
let address = Address::from([0x42; 20]);
|
||||
|
||||
// Create history shards in MDBX
|
||||
{
|
||||
let provider = factory.database_provider_rw().unwrap();
|
||||
let chunk = IntegerList::new([100, 200, 300]).unwrap();
|
||||
let shard_key = ShardedKey::new(address, u64::MAX);
|
||||
provider.tx_ref().put::<tables::AccountsHistory>(shard_key, chunk).unwrap();
|
||||
provider.commit().unwrap();
|
||||
}
|
||||
|
||||
// Create EitherReader - should use MDBX
|
||||
let rocksdb = factory.rocksdb_provider();
|
||||
let rocksdb_tx = rocksdb.tx();
|
||||
let provider = factory.database_provider_ro().unwrap();
|
||||
let mut reader = EitherReader::new_accounts_history(&provider, &rocksdb_tx).unwrap();
|
||||
|
||||
// Verify we got a Database reader
|
||||
assert!(matches!(reader, EitherReader::Database(_, _)));
|
||||
|
||||
// Query for block 150: should find block 200 in changeset
|
||||
let result = reader.account_history_info(address, 150, None).unwrap();
|
||||
assert_eq!(result, HistoryInfo::InChangeset(200));
|
||||
|
||||
// Query for block 50: should return NotYetWritten
|
||||
let result = reader.account_history_info(address, 50, None).unwrap();
|
||||
assert_eq!(result, HistoryInfo::NotYetWritten);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,7 +16,8 @@ pub use static_file::{
|
||||
mod state;
|
||||
pub use state::{
|
||||
historical::{
|
||||
HistoricalStateProvider, HistoricalStateProviderRef, HistoryInfo, LowestAvailableBlocks,
|
||||
find_changeset_block_from_index, HistoricalStateProvider, HistoricalStateProviderRef,
|
||||
HistoryInfo, LowestAvailableBlocks,
|
||||
},
|
||||
latest::{LatestStateProvider, LatestStateProviderRef},
|
||||
overlay::{OverlayStateProvider, OverlayStateProviderFactory},
|
||||
|
||||
@@ -568,6 +568,25 @@ fn needs_prev_shard_check(rank: u64, found_block: Option<u64>, block_number: Blo
|
||||
rank == 0 && found_block != Some(block_number)
|
||||
}
|
||||
|
||||
/// Determines where to find the historical value based on computed shard lookup results.
|
||||
///
|
||||
/// This is a pure function shared by both MDBX and `RocksDB` backends.
|
||||
/// Delegates to [`HistoryInfo::from_lookup`].
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `found_block` - The block number from the shard lookup
|
||||
/// * `is_before_first_write` - True if the target block is before the first write to this key. This
|
||||
/// should be computed as: `rank == 0 && found_block != Some(block_number) && !has_previous_shard`
|
||||
/// where `has_previous_shard` comes from a lazy `cursor.prev()` check.
|
||||
/// * `lowest_available` - Lowest block where history is available (pruning boundary)
|
||||
pub const fn find_changeset_block_from_index(
|
||||
found_block: Option<u64>,
|
||||
is_before_first_write: bool,
|
||||
lowest_available: Option<BlockNumber>,
|
||||
) -> HistoryInfo {
|
||||
HistoryInfo::from_lookup(found_block, is_before_first_write, lowest_available)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::needs_prev_shard_check;
|
||||
|
||||
Reference in New Issue
Block a user