Compare commits

...

3 Commits

Author SHA1 Message Date
yongkangc
9c404e16de test(storage): add parametrized backend equivalence tests for history lookups
Add tests that verify MDBX and RocksDB produce identical results for
history lookups using EitherWriter and EitherReader abstractions.
These tests address joshieDo's PR review feedback to use the Either*
types instead of duplicating backend-specific code.

- Add account_history_info and storage_history_info methods to EitherReader
- Add HistoryQuery struct and run_*_history_scenario helper functions
- Test scenarios: single shard, multiple shards, no history, pruning boundary
2026-01-10 02:03:16 +00:00
yongkangc
8b453292fb feat(storage): wire RocksDB into history lookups via EitherReader
This wires RocksDB into the history lookup paths:

- Adds account_history_info and storage_history_info methods to EitherReader
- Updates HistoricalStateProviderRef to use EitherReader for lookups
- Adds RocksDBProviderFactory trait bounds to provider impls
- Uses the rank/select pattern for efficient binary search in shards

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-09 03:13:47 +00:00
yongkangc
32e7f0572e feat(storage): add RocksDB history lookup methods 2026-01-09 02:52:11 +00:00
4 changed files with 932 additions and 91 deletions

View File

@@ -13,7 +13,9 @@ use crate::{
providers::{StaticFileProvider, StaticFileProviderRWRefMut},
StaticFileProviderFactory,
};
use alloy_primitives::{map::HashMap, Address, BlockNumber, TxHash, TxNumber};
use alloy_primitives::{map::HashMap, Address, BlockNumber, TxHash, TxNumber, B256};
use crate::providers::{needs_prev_shard_check, HistoryInfo};
use rayon::slice::ParallelSliceMut;
use reth_db::{
cursor::{DbCursorRO, DbDupCursorRW},
@@ -720,6 +722,76 @@ where
Self::RocksDB(tx) => tx.get::<tables::StoragesHistory>(key),
}
}
/// Lookup storage history and return [`HistoryInfo`] directly.
///
/// Uses the rank/select logic to efficiently find the first block >= target
/// where the storage slot was modified.
pub fn storage_history_info(
&mut self,
address: Address,
storage_key: B256,
block_number: BlockNumber,
lowest_available_block_number: Option<BlockNumber>,
) -> ProviderResult<HistoryInfo> {
match self {
Self::Database(cursor, _) => {
// Lookup the history chunk in the history index. If the key does not appear in the
// index, the first chunk for the next key will be returned so we filter out chunks
// that have a different key.
let key = StorageShardedKey::new(address, storage_key, block_number);
if let Some(chunk) = cursor
.seek(key)?
.filter(|(k, _)| k.address == address && k.sharded_key.key == storage_key)
.map(|x| x.1)
{
// Get the rank of the first entry before or equal to our block.
let mut rank = chunk.rank(block_number);
// Adjust the rank, so that we have the rank of the first entry strictly before
// our block (not equal to it).
if rank.checked_sub(1).and_then(|r| chunk.select(r)) == Some(block_number) {
rank -= 1;
}
let found_block = chunk.select(rank);
// If our block is before the first entry in the index chunk and this first
// entry doesn't equal to our block, it might be before the first write ever.
// To check, we look at the previous entry and check if the key is the same.
// This check is worth it, the `cursor.prev()` check is rarely triggered (the
// if will short-circuit) and when it passes we save a full seek into the
// changeset/plain state table.
let is_before_first_write =
needs_prev_shard_check(rank, found_block, block_number) &&
cursor.prev()?.is_none_or(|(k, _)| {
k.address != address || k.sharded_key.key != storage_key
});
Ok(HistoryInfo::from_lookup(
found_block,
is_before_first_write,
lowest_available_block_number,
))
} else if lowest_available_block_number.is_some() {
// The key may have been written, but due to pruning we may not have changesets
// and history, so we need to make a plain state lookup.
Ok(HistoryInfo::MaybeInPlainState)
} else {
// The key has not been written to at all.
Ok(HistoryInfo::NotYetWritten)
}
}
Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(tx) => tx.storage_history_info(
address,
storage_key,
block_number,
lowest_available_block_number,
),
}
}
}
impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
@@ -738,6 +810,68 @@ where
Self::RocksDB(tx) => tx.get::<tables::AccountsHistory>(key),
}
}
/// Lookup account history and return [`HistoryInfo`] directly.
///
/// Uses the rank/select logic to efficiently find the first block >= target
/// where the account was modified.
pub fn account_history_info(
&mut self,
address: Address,
block_number: BlockNumber,
lowest_available_block_number: Option<BlockNumber>,
) -> ProviderResult<HistoryInfo> {
match self {
Self::Database(cursor, _) => {
// Lookup the history chunk in the history index. If the key does not appear in the
// index, the first chunk for the next key will be returned so we filter out chunks
// that have a different key.
let key = ShardedKey::new(address, block_number);
if let Some(chunk) =
cursor.seek(key)?.filter(|(k, _)| k.key == address).map(|x| x.1)
{
// Get the rank of the first entry before or equal to our block.
let mut rank = chunk.rank(block_number);
// Adjust the rank, so that we have the rank of the first entry strictly before
// our block (not equal to it).
if rank.checked_sub(1).and_then(|r| chunk.select(r)) == Some(block_number) {
rank -= 1;
}
let found_block = chunk.select(rank);
// If our block is before the first entry in the index chunk and this first
// entry doesn't equal to our block, it might be before the first write ever.
// To check, we look at the previous entry and check if the key is the same.
// This check is worth it, the `cursor.prev()` check is rarely triggered (the
// if will short-circuit) and when it passes we save a full seek into the
// changeset/plain state table.
let is_before_first_write =
needs_prev_shard_check(rank, found_block, block_number) &&
cursor.prev()?.is_none_or(|(k, _)| k.key != address);
Ok(HistoryInfo::from_lookup(
found_block,
is_before_first_write,
lowest_available_block_number,
))
} else if lowest_available_block_number.is_some() {
// The key may have been written, but due to pruning we may not have changesets
// and history, so we need to make a plain state lookup.
Ok(HistoryInfo::MaybeInPlainState)
} else {
// The key has not been written to at all.
Ok(HistoryInfo::NotYetWritten)
}
}
Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(tx) => {
tx.account_history_info(address, block_number, lowest_available_block_number)
}
}
}
}
impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
@@ -886,7 +1020,10 @@ mod tests {
mod rocksdb_tests {
use super::*;
use crate::{
providers::rocksdb::{RocksDBBuilder, RocksDBProvider},
providers::{
rocksdb::{RocksDBBuilder, RocksDBProvider},
HistoryInfo,
},
test_utils::create_test_provider_factory,
RocksDBProviderFactory,
};
@@ -894,8 +1031,11 @@ mod rocksdb_tests {
use reth_db_api::{
models::{storage_sharded_key::StorageShardedKey, IntegerList, ShardedKey},
tables,
transaction::DbTxMut,
};
use reth_ethereum_primitives::EthPrimitives;
use reth_storage_api::{DatabaseProviderFactory, StorageSettings};
use std::marker::PhantomData;
use tempfile::TempDir;
fn create_rocksdb_provider() -> (TempDir, RocksDBProvider) {
@@ -1185,4 +1325,387 @@ mod rocksdb_tests {
"Data should be visible after provider.commit()"
);
}
// ==================== Parametrized Backend Equivalence Tests ====================
//
// These tests verify that MDBX and RocksDB produce identical results for history lookups.
// Each scenario sets up the same data in both backends and asserts identical HistoryInfo.
/// Query parameters for a history lookup test case.
struct HistoryQuery {
block_number: BlockNumber,
lowest_available: Option<BlockNumber>,
expected: HistoryInfo,
}
// Type aliases for cursor types (needed for EitherWriter/EitherReader type inference)
type AccountsHistoryWriteCursor =
reth_db::mdbx::cursor::Cursor<reth_db::mdbx::RW, tables::AccountsHistory>;
type StoragesHistoryWriteCursor =
reth_db::mdbx::cursor::Cursor<reth_db::mdbx::RW, tables::StoragesHistory>;
type AccountsHistoryReadCursor =
reth_db::mdbx::cursor::Cursor<reth_db::mdbx::RO, tables::AccountsHistory>;
type StoragesHistoryReadCursor =
reth_db::mdbx::cursor::Cursor<reth_db::mdbx::RO, tables::StoragesHistory>;
/// Runs the same account history queries against both MDBX and `RocksDB` backends,
/// asserting they produce identical results.
fn run_account_history_scenario(
scenario_name: &str,
address: Address,
shards: &[(BlockNumber, Vec<BlockNumber>)], // (shard_highest_block, blocks_in_shard)
queries: &[HistoryQuery],
) {
// Setup MDBX and RocksDB with identical data using EitherWriter
let factory = create_test_provider_factory();
let mdbx_provider = factory.database_provider_rw().unwrap();
let (temp_dir, rocks_provider) = create_rocksdb_provider();
// Create writers for both backends
let mut mdbx_writer: EitherWriter<'_, AccountsHistoryWriteCursor, EthPrimitives> =
EitherWriter::Database(
mdbx_provider.tx_ref().cursor_write::<tables::AccountsHistory>().unwrap(),
);
let mut rocks_writer: EitherWriter<'_, AccountsHistoryWriteCursor, EthPrimitives> =
EitherWriter::RocksDB(rocks_provider.batch());
// Write identical data to both backends in a single loop
for (highest_block, blocks) in shards {
let key = ShardedKey::new(address, *highest_block);
let value = IntegerList::new(blocks.clone()).unwrap();
mdbx_writer.put_account_history(key.clone(), &value).unwrap();
rocks_writer.put_account_history(key, &value).unwrap();
}
// Commit both backends
drop(mdbx_writer);
mdbx_provider.commit().unwrap();
if let EitherWriter::RocksDB(batch) = rocks_writer {
batch.commit().unwrap();
}
// Run queries against both backends using EitherReader
let mdbx_ro = factory.database_provider_ro().unwrap();
let rocks_tx = rocks_provider.tx();
for (i, query) in queries.iter().enumerate() {
// MDBX query via EitherReader
let mut mdbx_reader: EitherReader<'_, AccountsHistoryReadCursor, EthPrimitives> =
EitherReader::Database(
mdbx_ro.tx_ref().cursor_read::<tables::AccountsHistory>().unwrap(),
PhantomData,
);
let mdbx_result = mdbx_reader
.account_history_info(address, query.block_number, query.lowest_available)
.unwrap();
// RocksDB query via EitherReader (uses same cursor type for type consistency)
let mut rocks_reader: EitherReader<'_, AccountsHistoryReadCursor, EthPrimitives> =
EitherReader::RocksDB(&rocks_tx);
let rocks_result = rocks_reader
.account_history_info(address, query.block_number, query.lowest_available)
.unwrap();
// Assert both backends produce identical results
assert_eq!(
mdbx_result,
rocks_result,
"Backend mismatch in scenario '{}' query {}: block={}, lowest={:?}\n\
MDBX: {:?}, RocksDB: {:?}",
scenario_name,
i,
query.block_number,
query.lowest_available,
mdbx_result,
rocks_result
);
// Also verify against expected result
assert_eq!(
mdbx_result,
query.expected,
"Unexpected result in scenario '{}' query {}: block={}, lowest={:?}\n\
Got: {:?}, Expected: {:?}",
scenario_name,
i,
query.block_number,
query.lowest_available,
mdbx_result,
query.expected
);
}
rocks_tx.rollback().unwrap();
drop(temp_dir);
}
/// Runs the same storage history queries against both MDBX and `RocksDB` backends,
/// asserting they produce identical results.
fn run_storage_history_scenario(
scenario_name: &str,
address: Address,
storage_key: B256,
shards: &[(BlockNumber, Vec<BlockNumber>)], // (shard_highest_block, blocks_in_shard)
queries: &[HistoryQuery],
) {
// Setup MDBX and RocksDB with identical data using EitherWriter
let factory = create_test_provider_factory();
let mdbx_provider = factory.database_provider_rw().unwrap();
let (temp_dir, rocks_provider) = create_rocksdb_provider();
// Create writers for both backends
let mut mdbx_writer: EitherWriter<'_, StoragesHistoryWriteCursor, EthPrimitives> =
EitherWriter::Database(
mdbx_provider.tx_ref().cursor_write::<tables::StoragesHistory>().unwrap(),
);
let mut rocks_writer: EitherWriter<'_, StoragesHistoryWriteCursor, EthPrimitives> =
EitherWriter::RocksDB(rocks_provider.batch());
// Write identical data to both backends in a single loop
for (highest_block, blocks) in shards {
let key = StorageShardedKey::new(address, storage_key, *highest_block);
let value = IntegerList::new(blocks.clone()).unwrap();
mdbx_writer.put_storage_history(key.clone(), &value).unwrap();
rocks_writer.put_storage_history(key, &value).unwrap();
}
// Commit both backends
drop(mdbx_writer);
mdbx_provider.commit().unwrap();
if let EitherWriter::RocksDB(batch) = rocks_writer {
batch.commit().unwrap();
}
// Run queries against both backends using EitherReader
let mdbx_ro = factory.database_provider_ro().unwrap();
let rocks_tx = rocks_provider.tx();
for (i, query) in queries.iter().enumerate() {
// MDBX query via EitherReader
let mut mdbx_reader: EitherReader<'_, StoragesHistoryReadCursor, EthPrimitives> =
EitherReader::Database(
mdbx_ro.tx_ref().cursor_read::<tables::StoragesHistory>().unwrap(),
PhantomData,
);
let mdbx_result = mdbx_reader
.storage_history_info(
address,
storage_key,
query.block_number,
query.lowest_available,
)
.unwrap();
// RocksDB query via EitherReader (uses same cursor type for type consistency)
let mut rocks_reader: EitherReader<'_, StoragesHistoryReadCursor, EthPrimitives> =
EitherReader::RocksDB(&rocks_tx);
let rocks_result = rocks_reader
.storage_history_info(
address,
storage_key,
query.block_number,
query.lowest_available,
)
.unwrap();
// Assert both backends produce identical results
assert_eq!(
mdbx_result,
rocks_result,
"Backend mismatch in scenario '{}' query {}: block={}, lowest={:?}\n\
MDBX: {:?}, RocksDB: {:?}",
scenario_name,
i,
query.block_number,
query.lowest_available,
mdbx_result,
rocks_result
);
// Also verify against expected result
assert_eq!(
mdbx_result,
query.expected,
"Unexpected result in scenario '{}' query {}: block={}, lowest={:?}\n\
Got: {:?}, Expected: {:?}",
scenario_name,
i,
query.block_number,
query.lowest_available,
mdbx_result,
query.expected
);
}
rocks_tx.rollback().unwrap();
drop(temp_dir);
}
/// Tests account history lookups across both MDBX and `RocksDB` backends.
///
/// Covers the following scenarios from PR2's `RocksDB`-only tests:
/// 1. Single shard - basic lookups within one shard
/// 2. Multiple shards - `prev()` shard detection and transitions
/// 3. No history - query address with no entries
/// 4. Pruned before first entry - `lowest_available` boundary behavior
#[test]
fn test_account_history_info_both_backends() {
let address = Address::from([0x42; 20]);
// Scenario 1: Single shard with blocks [100, 200, 300]
run_account_history_scenario(
"single_shard",
address,
&[(u64::MAX, vec![100, 200, 300])],
&[
// Before first entry -> NotYetWritten
HistoryQuery {
block_number: 50,
lowest_available: None,
expected: HistoryInfo::NotYetWritten,
},
// Between entries -> InChangeset(next_write)
HistoryQuery {
block_number: 150,
lowest_available: None,
expected: HistoryInfo::InChangeset(200),
},
// Exact match on entry -> InChangeset(same_block)
HistoryQuery {
block_number: 300,
lowest_available: None,
expected: HistoryInfo::InChangeset(300),
},
// After last entry in last shard -> InPlainState
HistoryQuery {
block_number: 500,
lowest_available: None,
expected: HistoryInfo::InPlainState,
},
],
);
// Scenario 2: Multiple shards - tests prev() shard detection
run_account_history_scenario(
"multiple_shards",
address,
&[
(500, vec![100, 200, 300, 400, 500]), // First shard ends at 500
(u64::MAX, vec![600, 700, 800]), // Last shard
],
&[
// Before first shard, no prev -> NotYetWritten
HistoryQuery {
block_number: 50,
lowest_available: None,
expected: HistoryInfo::NotYetWritten,
},
// Within first shard
HistoryQuery {
block_number: 150,
lowest_available: None,
expected: HistoryInfo::InChangeset(200),
},
// Between shards - prev() should find first shard
HistoryQuery {
block_number: 550,
lowest_available: None,
expected: HistoryInfo::InChangeset(600),
},
// After all entries
HistoryQuery {
block_number: 900,
lowest_available: None,
expected: HistoryInfo::InPlainState,
},
],
);
// Scenario 3: No history for address
let address_without_history = Address::from([0x43; 20]);
run_account_history_scenario(
"no_history",
address_without_history,
&[], // No shards for this address
&[HistoryQuery {
block_number: 150,
lowest_available: None,
expected: HistoryInfo::NotYetWritten,
}],
);
// Scenario 4: Query at pruning boundary
// Note: We test block >= lowest_available because HistoricalStateProviderRef
// errors on blocks below the pruning boundary before doing the lookup.
// The RocksDB implementation doesn't have this check at the same level.
// This tests that when pruning IS available, both backends agree.
run_account_history_scenario(
"with_pruning_boundary",
address,
&[(u64::MAX, vec![100, 200, 300])],
&[
// At pruning boundary -> InChangeset(first entry after block)
HistoryQuery {
block_number: 100,
lowest_available: Some(100),
expected: HistoryInfo::InChangeset(100),
},
// After pruning boundary, between entries
HistoryQuery {
block_number: 150,
lowest_available: Some(100),
expected: HistoryInfo::InChangeset(200),
},
],
);
}
/// Tests storage history lookups across both MDBX and `RocksDB` backends.
#[test]
fn test_storage_history_info_both_backends() {
let address = Address::from([0x42; 20]);
let storage_key = B256::from([0x01; 32]);
let other_storage_key = B256::from([0x02; 32]);
// Single shard with blocks [100, 200, 300]
run_storage_history_scenario(
"storage_single_shard",
address,
storage_key,
&[(u64::MAX, vec![100, 200, 300])],
&[
// Before first entry -> NotYetWritten
HistoryQuery {
block_number: 50,
lowest_available: None,
expected: HistoryInfo::NotYetWritten,
},
// Between entries -> InChangeset(next_write)
HistoryQuery {
block_number: 150,
lowest_available: None,
expected: HistoryInfo::InChangeset(200),
},
// After last entry -> InPlainState
HistoryQuery {
block_number: 500,
lowest_available: None,
expected: HistoryInfo::InPlainState,
},
],
);
// No history for different storage key
run_storage_history_scenario(
"storage_no_history",
address,
other_storage_key,
&[], // No shards for this storage key
&[HistoryQuery {
block_number: 150,
lowest_available: None,
expected: HistoryInfo::NotYetWritten,
}],
);
}
}

View File

@@ -16,7 +16,8 @@ pub use static_file::{
mod state;
pub use state::{
historical::{
HistoricalStateProvider, HistoricalStateProviderRef, HistoryInfo, LowestAvailableBlocks,
needs_prev_shard_check, HistoricalStateProvider, HistoricalStateProviderRef, HistoryInfo,
LowestAvailableBlocks,
},
latest::{LatestStateProvider, LatestStateProviderRef},
overlay::{OverlayStateProvider, OverlayStateProviderFactory},

View File

@@ -1,7 +1,10 @@
use super::metrics::{RocksDBMetrics, RocksDBOperation};
use crate::providers::{needs_prev_shard_check, HistoryInfo};
use alloy_primitives::{Address, BlockNumber, B256};
use reth_db_api::{
table::{Compress, Decompress, Encode, Table},
tables, DatabaseError,
models::{storage_sharded_key::StorageShardedKey, ShardedKey},
table::{Compress, Decode, Decompress, Encode, Table},
tables, BlockNumberList, DatabaseError,
};
use reth_storage_errors::{
db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel},
@@ -9,8 +12,8 @@ use reth_storage_errors::{
};
use rocksdb::{
BlockBasedOptions, Cache, ColumnFamilyDescriptor, CompactionPri, DBCompressionType,
IteratorMode, Options, Transaction, TransactionDB, TransactionDBOptions, TransactionOptions,
WriteBatchWithTransaction, WriteOptions,
DBRawIteratorWithThreadMode, IteratorMode, Options, Transaction, TransactionDB,
TransactionDBOptions, TransactionOptions, WriteBatchWithTransaction, WriteOptions,
};
use std::{
fmt,
@@ -666,6 +669,171 @@ impl<'db> RocksTx<'db> {
ProviderError::Database(DatabaseError::Other(format!("rollback failed: {e}")))
})
}
/// Lookup account history and return [`HistoryInfo`] directly.
///
/// This is a thin wrapper around `history_info` that:
/// - Builds the `ShardedKey` for the address + target block.
/// - Validates that the found shard belongs to the same address.
pub fn account_history_info(
&self,
address: Address,
block_number: BlockNumber,
lowest_available_block_number: Option<BlockNumber>,
) -> ProviderResult<HistoryInfo> {
let key = ShardedKey::new(address, block_number);
self.history_info::<tables::AccountsHistory>(
key.encode().as_ref(),
block_number,
lowest_available_block_number,
|key_bytes| Ok(<ShardedKey<Address> as Decode>::decode(key_bytes)?.key == address),
|prev_bytes| {
<ShardedKey<Address> as Decode>::decode(prev_bytes)
.map(|k| k.key == address)
.unwrap_or(false)
},
)
}
/// Lookup storage history and return [`HistoryInfo`] directly.
///
/// This is a thin wrapper around `history_info` that:
/// - Builds the `StorageShardedKey` for address + storage key + target block.
/// - Validates that the found shard belongs to the same address and storage slot.
pub fn storage_history_info(
&self,
address: Address,
storage_key: B256,
block_number: BlockNumber,
lowest_available_block_number: Option<BlockNumber>,
) -> ProviderResult<HistoryInfo> {
let key = StorageShardedKey::new(address, storage_key, block_number);
self.history_info::<tables::StoragesHistory>(
key.encode().as_ref(),
block_number,
lowest_available_block_number,
|key_bytes| {
let k = <StorageShardedKey as Decode>::decode(key_bytes)?;
Ok(k.address == address && k.sharded_key.key == storage_key)
},
|prev_bytes| {
<StorageShardedKey as Decode>::decode(prev_bytes)
.map(|k| k.address == address && k.sharded_key.key == storage_key)
.unwrap_or(false)
},
)
}
/// Generic history lookup for sharded history tables.
///
/// Seeks to the shard containing `block_number`, checks if the key matches via `key_matches`,
/// and uses `prev_key_matches` to detect if a previous shard exists for the same key.
fn history_info<T>(
&self,
encoded_key: &[u8],
block_number: BlockNumber,
lowest_available_block_number: Option<BlockNumber>,
key_matches: impl FnOnce(&[u8]) -> Result<bool, reth_db_api::DatabaseError>,
prev_key_matches: impl Fn(&[u8]) -> bool,
) -> ProviderResult<HistoryInfo>
where
T: Table<Value = BlockNumberList>,
{
let cf = self.provider.0.db.cf_handle(T::NAME).ok_or_else(|| {
ProviderError::Database(DatabaseError::Other(format!(
"column family not found: {}",
T::NAME
)))
})?;
// Create a raw iterator to access key bytes directly.
let mut iter: DBRawIteratorWithThreadMode<'_, Transaction<'_, TransactionDB>> =
self.inner.raw_iterator_cf(&cf);
// Seek to the smallest key >= encoded_key.
iter.seek(encoded_key);
Self::raw_iter_status_ok(&iter)?;
if !iter.valid() {
// No shard found at or after target block.
return if lowest_available_block_number.is_some() {
// The key may have been written, but due to pruning we may not have changesets
// and history, so we need to make a plain state lookup.
Ok(HistoryInfo::MaybeInPlainState)
} else {
// The key has not been written to at all.
Ok(HistoryInfo::NotYetWritten)
};
}
// Check if the found key matches our target entity.
let Some(key_bytes) = iter.key() else {
return if lowest_available_block_number.is_some() {
Ok(HistoryInfo::MaybeInPlainState)
} else {
Ok(HistoryInfo::NotYetWritten)
};
};
if !key_matches(key_bytes)? {
// The found key is for a different entity.
return if lowest_available_block_number.is_some() {
Ok(HistoryInfo::MaybeInPlainState)
} else {
Ok(HistoryInfo::NotYetWritten)
};
}
// Decompress the block list for this shard.
let Some(value_bytes) = iter.value() else {
return if lowest_available_block_number.is_some() {
Ok(HistoryInfo::MaybeInPlainState)
} else {
Ok(HistoryInfo::NotYetWritten)
};
};
let chunk = BlockNumberList::decompress(value_bytes)?;
// Get the rank of the first entry before or equal to our block.
let mut rank = chunk.rank(block_number);
// Adjust the rank, so that we have the rank of the first entry strictly before our
// block (not equal to it).
if rank.checked_sub(1).and_then(|r| chunk.select(r)) == Some(block_number) {
rank -= 1;
}
let found_block = chunk.select(rank);
// Lazy check for previous shard - only called when needed.
// If we can step to a previous shard for this same key, history already exists,
// so the target block is not before the first write.
let is_before_first_write = if needs_prev_shard_check(rank, found_block, block_number) {
iter.prev();
Self::raw_iter_status_ok(&iter)?;
let has_prev = iter.valid() && iter.key().is_some_and(&prev_key_matches);
!has_prev
} else {
false
};
Ok(HistoryInfo::from_lookup(
found_block,
is_before_first_write,
lowest_available_block_number,
))
}
/// Returns an error if the raw iterator is in an invalid state due to an I/O error.
fn raw_iter_status_ok(
iter: &DBRawIteratorWithThreadMode<'_, Transaction<'_, TransactionDB>>,
) -> ProviderResult<()> {
iter.status().map_err(|e| {
ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
}))
})
}
}
/// Iterator over a `RocksDB` table (non-transactional).
@@ -770,6 +938,7 @@ const fn convert_log_level(level: LogLevel) -> rocksdb::LogLevel {
#[cfg(test)]
mod tests {
use super::*;
use crate::providers::HistoryInfo;
use alloy_primitives::{Address, TxHash, B256};
use reth_db_api::{
models::{sharded_key::ShardedKey, storage_sharded_key::StorageShardedKey, IntegerList},
@@ -1094,4 +1263,158 @@ mod tests {
let last = provider.last::<TestTable>().unwrap();
assert_eq!(last, Some((20, b"value_20".to_vec())));
}
#[test]
fn test_account_history_info_single_shard() {
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let address = Address::from([0x42; 20]);
// Create a single shard with blocks [100, 200, 300] and highest_block = u64::MAX
// This is the "last shard" invariant
let chunk = IntegerList::new([100, 200, 300]).unwrap();
let shard_key = ShardedKey::new(address, u64::MAX);
provider.put::<tables::AccountsHistory>(shard_key, &chunk).unwrap();
let tx = provider.tx();
// Query for block 150: should find block 200 in changeset
let result = tx.account_history_info(address, 150, None).unwrap();
assert_eq!(result, HistoryInfo::InChangeset(200));
// Query for block 50: should return NotYetWritten (before first entry, no prev shard)
let result = tx.account_history_info(address, 50, None).unwrap();
assert_eq!(result, HistoryInfo::NotYetWritten);
// Query for block 300: should return InChangeset(300) - exact match means look at
// changeset at that block for the previous value
let result = tx.account_history_info(address, 300, None).unwrap();
assert_eq!(result, HistoryInfo::InChangeset(300));
// Query for block 500: should return InPlainState (after last entry in last shard)
let result = tx.account_history_info(address, 500, None).unwrap();
assert_eq!(result, HistoryInfo::InPlainState);
tx.rollback().unwrap();
}
#[test]
fn test_account_history_info_multiple_shards() {
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let address = Address::from([0x42; 20]);
// Create two shards: first shard ends at block 500, second is the last shard
let chunk1 = IntegerList::new([100, 200, 300, 400, 500]).unwrap();
let shard_key1 = ShardedKey::new(address, 500);
provider.put::<tables::AccountsHistory>(shard_key1, &chunk1).unwrap();
let chunk2 = IntegerList::new([600, 700, 800]).unwrap();
let shard_key2 = ShardedKey::new(address, u64::MAX);
provider.put::<tables::AccountsHistory>(shard_key2, &chunk2).unwrap();
let tx = provider.tx();
// Query for block 50: should return NotYetWritten (before first shard, no prev)
let result = tx.account_history_info(address, 50, None).unwrap();
assert_eq!(result, HistoryInfo::NotYetWritten);
// Query for block 150: should find block 200 in first shard's changeset
let result = tx.account_history_info(address, 150, None).unwrap();
assert_eq!(result, HistoryInfo::InChangeset(200));
// Query for block 550: should find block 600 in second shard's changeset
// prev() should detect first shard exists
let result = tx.account_history_info(address, 550, None).unwrap();
assert_eq!(result, HistoryInfo::InChangeset(600));
// Query for block 900: should return InPlainState (after last entry in last shard)
let result = tx.account_history_info(address, 900, None).unwrap();
assert_eq!(result, HistoryInfo::InPlainState);
tx.rollback().unwrap();
}
#[test]
fn test_account_history_info_no_history() {
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let address1 = Address::from([0x42; 20]);
let address2 = Address::from([0x43; 20]);
// Only add history for address1
let chunk = IntegerList::new([100, 200, 300]).unwrap();
let shard_key = ShardedKey::new(address1, u64::MAX);
provider.put::<tables::AccountsHistory>(shard_key, &chunk).unwrap();
let tx = provider.tx();
// Query for address2 (no history exists): should return NotYetWritten
let result = tx.account_history_info(address2, 150, None).unwrap();
assert_eq!(result, HistoryInfo::NotYetWritten);
tx.rollback().unwrap();
}
#[test]
fn test_account_history_info_pruned_before_first_entry() {
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let address = Address::from([0x42; 20]);
// Create a single shard starting at block 100
let chunk = IntegerList::new([100, 200, 300]).unwrap();
let shard_key = ShardedKey::new(address, u64::MAX);
provider.put::<tables::AccountsHistory>(shard_key, &chunk).unwrap();
let tx = provider.tx();
// Query for block 50 with lowest_available_block_number = 100
// This simulates a pruned state where data before block 100 is not available.
// Since we're before the first write AND pruning boundary is set, we need to
// check the changeset at the first write block.
let result = tx.account_history_info(address, 50, Some(100)).unwrap();
assert_eq!(result, HistoryInfo::InChangeset(100));
tx.rollback().unwrap();
}
#[test]
fn test_storage_history_info() {
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let address = Address::from([0x42; 20]);
let storage_key = B256::from([0x01; 32]);
// Create a single shard for this storage slot
let chunk = IntegerList::new([100, 200, 300]).unwrap();
let shard_key = StorageShardedKey::new(address, storage_key, u64::MAX);
provider.put::<tables::StoragesHistory>(shard_key, &chunk).unwrap();
let tx = provider.tx();
// Query for block 150: should find block 200 in changeset
let result = tx.storage_history_info(address, storage_key, 150, None).unwrap();
assert_eq!(result, HistoryInfo::InChangeset(200));
// Query for block 50: should return NotYetWritten
let result = tx.storage_history_info(address, storage_key, 50, None).unwrap();
assert_eq!(result, HistoryInfo::NotYetWritten);
// Query for block 500: should return InPlainState
let result = tx.storage_history_info(address, storage_key, 500, None).unwrap();
assert_eq!(result, HistoryInfo::InPlainState);
// Query for different storage key (no history): should return NotYetWritten
let other_key = B256::from([0x02; 32]);
let result = tx.storage_history_info(address, other_key, 150, None).unwrap();
assert_eq!(result, HistoryInfo::NotYetWritten);
tx.rollback().unwrap();
}
}

View File

@@ -1,20 +1,14 @@
use crate::{
AccountReader, BlockHashReader, ChangeSetReader, HashedPostStateProvider, ProviderError,
StateProvider, StateRootProvider,
AccountReader, BlockHashReader, ChangeSetReader, EitherReader, HashedPostStateProvider,
ProviderError, RocksDBProviderFactory, StateProvider, StateRootProvider,
};
use alloy_eips::merge::EPOCH_SLOTS;
use alloy_primitives::{Address, BlockNumber, Bytes, StorageKey, StorageValue, B256};
use reth_db_api::{
cursor::{DbCursorRO, DbDupCursorRO},
models::{storage_sharded_key::StorageShardedKey, ShardedKey},
table::Table,
tables,
transaction::DbTx,
BlockNumberList,
};
use reth_db_api::{cursor::DbDupCursorRO, tables, transaction::DbTx};
use reth_primitives_traits::{Account, Bytecode};
use reth_storage_api::{
BlockNumReader, BytecodeReader, DBProvider, StateProofProvider, StorageRootProvider,
BlockNumReader, BytecodeReader, DBProvider, NodePrimitivesProvider, StateProofProvider,
StorageRootProvider, StorageSettingsCache,
};
use reth_storage_errors::provider::ProviderResult;
use reth_trie::{
@@ -127,36 +121,61 @@ impl<'b, Provider: DBProvider + ChangeSetReader + BlockNumReader>
Self { provider, block_number, lowest_available_blocks }
}
/// Lookup an account in the `AccountsHistory` table
pub fn account_history_lookup(&self, address: Address) -> ProviderResult<HistoryInfo> {
/// Lookup an account in the `AccountsHistory` table using `EitherReader`.
pub fn account_history_lookup(&self, address: Address) -> ProviderResult<HistoryInfo>
where
Provider: StorageSettingsCache + RocksDBProviderFactory + NodePrimitivesProvider,
{
if !self.lowest_available_blocks.is_account_history_available(self.block_number) {
return Err(ProviderError::StateAtBlockPruned(self.block_number))
}
// history key to search IntegerList of block number changesets.
let history_key = ShardedKey::new(address, self.block_number);
self.history_info::<tables::AccountsHistory, _>(
history_key,
|key| key.key == address,
// Create RocksDB tx only when the feature is enabled
#[cfg(all(unix, feature = "rocksdb"))]
let rocks_provider = self.provider.rocksdb_provider();
#[cfg(all(unix, feature = "rocksdb"))]
let rocks_tx = rocks_provider.tx();
#[cfg(all(unix, feature = "rocksdb"))]
let rocks_tx_ref = &rocks_tx;
#[cfg(not(all(unix, feature = "rocksdb")))]
let rocks_tx_ref = ();
let mut reader = EitherReader::new_accounts_history(self.provider, rocks_tx_ref)?;
reader.account_history_info(
address,
self.block_number,
self.lowest_available_blocks.account_history_block_number,
)
}
/// Lookup a storage key in the `StoragesHistory` table
/// Lookup a storage key in the `StoragesHistory` table using `EitherReader`.
pub fn storage_history_lookup(
&self,
address: Address,
storage_key: StorageKey,
) -> ProviderResult<HistoryInfo> {
) -> ProviderResult<HistoryInfo>
where
Provider: StorageSettingsCache + RocksDBProviderFactory + NodePrimitivesProvider,
{
if !self.lowest_available_blocks.is_storage_history_available(self.block_number) {
return Err(ProviderError::StateAtBlockPruned(self.block_number))
}
// history key to search IntegerList of block number changesets.
let history_key = StorageShardedKey::new(address, storage_key, self.block_number);
self.history_info::<tables::StoragesHistory, _>(
history_key,
|key| key.address == address && key.sharded_key.key == storage_key,
// Create RocksDB tx only when the feature is enabled
#[cfg(all(unix, feature = "rocksdb"))]
let rocks_provider = self.provider.rocksdb_provider();
#[cfg(all(unix, feature = "rocksdb"))]
let rocks_tx = rocks_provider.tx();
#[cfg(all(unix, feature = "rocksdb"))]
let rocks_tx_ref = &rocks_tx;
#[cfg(not(all(unix, feature = "rocksdb")))]
let rocks_tx_ref = ();
let mut reader = EitherReader::new_storages_history(self.provider, rocks_tx_ref)?;
reader.storage_history_info(
address,
storage_key,
self.block_number,
self.lowest_available_blocks.storage_history_block_number,
)
}
@@ -204,57 +223,6 @@ impl<'b, Provider: DBProvider + ChangeSetReader + BlockNumReader>
Ok(HashedStorage::from_reverts(self.tx(), address, self.block_number)?)
}
fn history_info<T, K>(
&self,
key: K,
key_filter: impl Fn(&K) -> bool,
lowest_available_block_number: Option<BlockNumber>,
) -> ProviderResult<HistoryInfo>
where
T: Table<Key = K, Value = BlockNumberList>,
{
let mut cursor = self.tx().cursor_read::<T>()?;
// Lookup the history chunk in the history index. If the key does not appear in the
// index, the first chunk for the next key will be returned so we filter out chunks that
// have a different key.
if let Some(chunk) = cursor.seek(key)?.filter(|(key, _)| key_filter(key)).map(|x| x.1) {
// Get the rank of the first entry before or equal to our block.
let mut rank = chunk.rank(self.block_number);
// Adjust the rank, so that we have the rank of the first entry strictly before our
// block (not equal to it).
if rank.checked_sub(1).and_then(|r| chunk.select(r)) == Some(self.block_number) {
rank -= 1;
}
let found_block = chunk.select(rank);
// If our block is before the first entry in the index chunk and this first entry
// doesn't equal to our block, it might be before the first write ever. To check, we
// look at the previous entry and check if the key is the same.
// This check is worth it, the `cursor.prev()` check is rarely triggered (the if will
// short-circuit) and when it passes we save a full seek into the changeset/plain state
// table.
let is_before_first_write =
needs_prev_shard_check(rank, found_block, self.block_number) &&
!cursor.prev()?.is_some_and(|(key, _)| key_filter(&key));
Ok(HistoryInfo::from_lookup(
found_block,
is_before_first_write,
lowest_available_block_number,
))
} else if lowest_available_block_number.is_some() {
// The key may have been written, but due to pruning we may not have changesets and
// history, so we need to make a plain state lookup.
Ok(HistoryInfo::MaybeInPlainState)
} else {
// The key has not been written to at all.
Ok(HistoryInfo::NotYetWritten)
}
}
/// Set the lowest block number at which the account history is available.
pub const fn with_lowest_available_account_history_block_number(
mut self,
@@ -280,8 +248,14 @@ impl<Provider: DBProvider + BlockNumReader> HistoricalStateProviderRef<'_, Provi
}
}
impl<Provider: DBProvider + BlockNumReader + ChangeSetReader> AccountReader
for HistoricalStateProviderRef<'_, Provider>
impl<
Provider: DBProvider
+ BlockNumReader
+ ChangeSetReader
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider,
> AccountReader for HistoricalStateProviderRef<'_, Provider>
{
/// Get basic account information.
fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
@@ -436,8 +410,15 @@ impl<Provider> HashedPostStateProvider for HistoricalStateProviderRef<'_, Provid
}
}
impl<Provider: DBProvider + BlockNumReader + BlockHashReader + ChangeSetReader> StateProvider
for HistoricalStateProviderRef<'_, Provider>
impl<
Provider: DBProvider
+ BlockNumReader
+ BlockHashReader
+ ChangeSetReader
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider,
> StateProvider for HistoricalStateProviderRef<'_, Provider>
{
/// Get storage.
fn storage(
@@ -527,7 +508,7 @@ impl<Provider: DBProvider + ChangeSetReader + BlockNumReader> HistoricalStatePro
}
// Delegates all provider impls to [HistoricalStateProviderRef]
reth_storage_api::macros::delegate_provider_impls!(HistoricalStateProvider<Provider> where [Provider: DBProvider + BlockNumReader + BlockHashReader + ChangeSetReader]);
reth_storage_api::macros::delegate_provider_impls!(HistoricalStateProvider<Provider> where [Provider: DBProvider + BlockNumReader + BlockHashReader + ChangeSetReader + StorageSettingsCache + RocksDBProviderFactory + NodePrimitivesProvider]);
/// Lowest blocks at which different parts of the state are available.
/// They may be [Some] if pruning is enabled.
@@ -561,7 +542,12 @@ impl LowestAvailableBlocks {
///
/// Returns `true` when `rank == 0` (first entry in shard) and the found block doesn't match
/// the target block number. In this case, we need to check if there's a previous shard.
fn needs_prev_shard_check(rank: u64, found_block: Option<u64>, block_number: BlockNumber) -> bool {
#[inline]
pub fn needs_prev_shard_check(
rank: u64,
found_block: Option<u64>,
block_number: BlockNumber,
) -> bool {
rank == 0 && found_block != Some(block_number)
}
@@ -571,7 +557,8 @@ mod tests {
use crate::{
providers::state::historical::{HistoryInfo, LowestAvailableBlocks},
test_utils::create_test_provider_factory,
AccountReader, HistoricalStateProvider, HistoricalStateProviderRef, StateProvider,
AccountReader, HistoricalStateProvider, HistoricalStateProviderRef, RocksDBProviderFactory,
StateProvider,
};
use alloy_primitives::{address, b256, Address, B256, U256};
use reth_db_api::{
@@ -583,6 +570,7 @@ mod tests {
use reth_primitives_traits::{Account, StorageEntry};
use reth_storage_api::{
BlockHashReader, BlockNumReader, ChangeSetReader, DBProvider, DatabaseProviderFactory,
NodePrimitivesProvider, StorageSettingsCache,
};
use reth_storage_errors::provider::ProviderError;
@@ -594,7 +582,13 @@ mod tests {
const fn assert_state_provider<T: StateProvider>() {}
#[expect(dead_code)]
const fn assert_historical_state_provider<
T: DBProvider + BlockNumReader + BlockHashReader + ChangeSetReader,
T: DBProvider
+ BlockNumReader
+ BlockHashReader
+ ChangeSetReader
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider,
>() {
assert_state_provider::<HistoricalStateProvider<T>>();
}