feat(storage): wire RocksDB into history lookups via EitherReader

This wires RocksDB into the history lookup paths:

- Adds account_history_info and storage_history_info methods to EitherReader
- Updates HistoricalStateProviderRef to use EitherReader for lookups
- Adds RocksDBProviderFactory trait bounds to provider impls
- Uses the rank/select pattern for efficient binary search in shards

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
yongkangc
2026-01-09 03:13:47 +00:00
parent 32e7f0572e
commit 8b453292fb
2 changed files with 207 additions and 84 deletions

View File

@@ -13,7 +13,9 @@ use crate::{
providers::{StaticFileProvider, StaticFileProviderRWRefMut},
StaticFileProviderFactory,
};
use alloy_primitives::{map::HashMap, Address, BlockNumber, TxHash, TxNumber};
use alloy_primitives::{map::HashMap, Address, BlockNumber, TxHash, TxNumber, B256};
use crate::providers::{needs_prev_shard_check, HistoryInfo};
use rayon::slice::ParallelSliceMut;
use reth_db::{
cursor::{DbCursorRO, DbDupCursorRW},
@@ -720,6 +722,76 @@ where
Self::RocksDB(tx) => tx.get::<tables::StoragesHistory>(key),
}
}
/// Lookup storage history and return [`HistoryInfo`] directly.
///
/// Uses the rank/select logic to efficiently find the first block >= target
/// where the storage slot was modified.
pub fn storage_history_info(
&mut self,
address: Address,
storage_key: B256,
block_number: BlockNumber,
lowest_available_block_number: Option<BlockNumber>,
) -> ProviderResult<HistoryInfo> {
match self {
Self::Database(cursor, _) => {
// Lookup the history chunk in the history index. If the key does not appear in the
// index, the first chunk for the next key will be returned so we filter out chunks
// that have a different key.
let key = StorageShardedKey::new(address, storage_key, block_number);
if let Some(chunk) = cursor
.seek(key)?
.filter(|(k, _)| k.address == address && k.sharded_key.key == storage_key)
.map(|x| x.1)
{
// Get the rank of the first entry before or equal to our block.
let mut rank = chunk.rank(block_number);
// Adjust the rank, so that we have the rank of the first entry strictly before
// our block (not equal to it).
if rank.checked_sub(1).and_then(|r| chunk.select(r)) == Some(block_number) {
rank -= 1;
}
let found_block = chunk.select(rank);
// If our block is before the first entry in the index chunk and this first
// entry doesn't equal to our block, it might be before the first write ever.
// To check, we look at the previous entry and check if the key is the same.
// This check is worth it, the `cursor.prev()` check is rarely triggered (the
// if will short-circuit) and when it passes we save a full seek into the
// changeset/plain state table.
let is_before_first_write =
needs_prev_shard_check(rank, found_block, block_number) &&
cursor.prev()?.is_none_or(|(k, _)| {
k.address != address || k.sharded_key.key != storage_key
});
Ok(HistoryInfo::from_lookup(
found_block,
is_before_first_write,
lowest_available_block_number,
))
} else if lowest_available_block_number.is_some() {
// The key may have been written, but due to pruning we may not have changesets
// and history, so we need to make a plain state lookup.
Ok(HistoryInfo::MaybeInPlainState)
} else {
// The key has not been written to at all.
Ok(HistoryInfo::NotYetWritten)
}
}
Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(tx) => tx.storage_history_info(
address,
storage_key,
block_number,
lowest_available_block_number,
),
}
}
}
impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
@@ -738,6 +810,68 @@ where
Self::RocksDB(tx) => tx.get::<tables::AccountsHistory>(key),
}
}
/// Lookup account history and return [`HistoryInfo`] directly.
///
/// Uses the rank/select logic to efficiently find the first block >= target
/// where the account was modified.
pub fn account_history_info(
&mut self,
address: Address,
block_number: BlockNumber,
lowest_available_block_number: Option<BlockNumber>,
) -> ProviderResult<HistoryInfo> {
match self {
Self::Database(cursor, _) => {
// Lookup the history chunk in the history index. If the key does not appear in the
// index, the first chunk for the next key will be returned so we filter out chunks
// that have a different key.
let key = ShardedKey::new(address, block_number);
if let Some(chunk) =
cursor.seek(key)?.filter(|(k, _)| k.key == address).map(|x| x.1)
{
// Get the rank of the first entry before or equal to our block.
let mut rank = chunk.rank(block_number);
// Adjust the rank, so that we have the rank of the first entry strictly before
// our block (not equal to it).
if rank.checked_sub(1).and_then(|r| chunk.select(r)) == Some(block_number) {
rank -= 1;
}
let found_block = chunk.select(rank);
// If our block is before the first entry in the index chunk and this first
// entry doesn't equal to our block, it might be before the first write ever.
// To check, we look at the previous entry and check if the key is the same.
// This check is worth it, the `cursor.prev()` check is rarely triggered (the
// if will short-circuit) and when it passes we save a full seek into the
// changeset/plain state table.
let is_before_first_write =
needs_prev_shard_check(rank, found_block, block_number) &&
cursor.prev()?.is_none_or(|(k, _)| k.key != address);
Ok(HistoryInfo::from_lookup(
found_block,
is_before_first_write,
lowest_available_block_number,
))
} else if lowest_available_block_number.is_some() {
// The key may have been written, but due to pruning we may not have changesets
// and history, so we need to make a plain state lookup.
Ok(HistoryInfo::MaybeInPlainState)
} else {
// The key has not been written to at all.
Ok(HistoryInfo::NotYetWritten)
}
}
Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(tx) => {
tx.account_history_info(address, block_number, lowest_available_block_number)
}
}
}
}
impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>

View File

@@ -1,20 +1,14 @@
use crate::{
AccountReader, BlockHashReader, ChangeSetReader, HashedPostStateProvider, ProviderError,
StateProvider, StateRootProvider,
AccountReader, BlockHashReader, ChangeSetReader, EitherReader, HashedPostStateProvider,
ProviderError, RocksDBProviderFactory, StateProvider, StateRootProvider,
};
use alloy_eips::merge::EPOCH_SLOTS;
use alloy_primitives::{Address, BlockNumber, Bytes, StorageKey, StorageValue, B256};
use reth_db_api::{
cursor::{DbCursorRO, DbDupCursorRO},
models::{storage_sharded_key::StorageShardedKey, ShardedKey},
table::Table,
tables,
transaction::DbTx,
BlockNumberList,
};
use reth_db_api::{cursor::DbDupCursorRO, tables, transaction::DbTx};
use reth_primitives_traits::{Account, Bytecode};
use reth_storage_api::{
BlockNumReader, BytecodeReader, DBProvider, StateProofProvider, StorageRootProvider,
BlockNumReader, BytecodeReader, DBProvider, NodePrimitivesProvider, StateProofProvider,
StorageRootProvider, StorageSettingsCache,
};
use reth_storage_errors::provider::ProviderResult;
use reth_trie::{
@@ -127,36 +121,61 @@ impl<'b, Provider: DBProvider + ChangeSetReader + BlockNumReader>
Self { provider, block_number, lowest_available_blocks }
}
/// Lookup an account in the `AccountsHistory` table
pub fn account_history_lookup(&self, address: Address) -> ProviderResult<HistoryInfo> {
/// Lookup an account in the `AccountsHistory` table using `EitherReader`.
pub fn account_history_lookup(&self, address: Address) -> ProviderResult<HistoryInfo>
where
Provider: StorageSettingsCache + RocksDBProviderFactory + NodePrimitivesProvider,
{
if !self.lowest_available_blocks.is_account_history_available(self.block_number) {
return Err(ProviderError::StateAtBlockPruned(self.block_number))
}
// history key to search IntegerList of block number changesets.
let history_key = ShardedKey::new(address, self.block_number);
self.history_info::<tables::AccountsHistory, _>(
history_key,
|key| key.key == address,
// Create RocksDB tx only when the feature is enabled
#[cfg(all(unix, feature = "rocksdb"))]
let rocks_provider = self.provider.rocksdb_provider();
#[cfg(all(unix, feature = "rocksdb"))]
let rocks_tx = rocks_provider.tx();
#[cfg(all(unix, feature = "rocksdb"))]
let rocks_tx_ref = &rocks_tx;
#[cfg(not(all(unix, feature = "rocksdb")))]
let rocks_tx_ref = ();
let mut reader = EitherReader::new_accounts_history(self.provider, rocks_tx_ref)?;
reader.account_history_info(
address,
self.block_number,
self.lowest_available_blocks.account_history_block_number,
)
}
/// Lookup a storage key in the `StoragesHistory` table
/// Lookup a storage key in the `StoragesHistory` table using `EitherReader`.
pub fn storage_history_lookup(
&self,
address: Address,
storage_key: StorageKey,
) -> ProviderResult<HistoryInfo> {
) -> ProviderResult<HistoryInfo>
where
Provider: StorageSettingsCache + RocksDBProviderFactory + NodePrimitivesProvider,
{
if !self.lowest_available_blocks.is_storage_history_available(self.block_number) {
return Err(ProviderError::StateAtBlockPruned(self.block_number))
}
// history key to search IntegerList of block number changesets.
let history_key = StorageShardedKey::new(address, storage_key, self.block_number);
self.history_info::<tables::StoragesHistory, _>(
history_key,
|key| key.address == address && key.sharded_key.key == storage_key,
// Create RocksDB tx only when the feature is enabled
#[cfg(all(unix, feature = "rocksdb"))]
let rocks_provider = self.provider.rocksdb_provider();
#[cfg(all(unix, feature = "rocksdb"))]
let rocks_tx = rocks_provider.tx();
#[cfg(all(unix, feature = "rocksdb"))]
let rocks_tx_ref = &rocks_tx;
#[cfg(not(all(unix, feature = "rocksdb")))]
let rocks_tx_ref = ();
let mut reader = EitherReader::new_storages_history(self.provider, rocks_tx_ref)?;
reader.storage_history_info(
address,
storage_key,
self.block_number,
self.lowest_available_blocks.storage_history_block_number,
)
}
@@ -204,57 +223,6 @@ impl<'b, Provider: DBProvider + ChangeSetReader + BlockNumReader>
Ok(HashedStorage::from_reverts(self.tx(), address, self.block_number)?)
}
fn history_info<T, K>(
&self,
key: K,
key_filter: impl Fn(&K) -> bool,
lowest_available_block_number: Option<BlockNumber>,
) -> ProviderResult<HistoryInfo>
where
T: Table<Key = K, Value = BlockNumberList>,
{
let mut cursor = self.tx().cursor_read::<T>()?;
// Lookup the history chunk in the history index. If the key does not appear in the
// index, the first chunk for the next key will be returned so we filter out chunks that
// have a different key.
if let Some(chunk) = cursor.seek(key)?.filter(|(key, _)| key_filter(key)).map(|x| x.1) {
// Get the rank of the first entry before or equal to our block.
let mut rank = chunk.rank(self.block_number);
// Adjust the rank, so that we have the rank of the first entry strictly before our
// block (not equal to it).
if rank.checked_sub(1).and_then(|r| chunk.select(r)) == Some(self.block_number) {
rank -= 1;
}
let found_block = chunk.select(rank);
// If our block is before the first entry in the index chunk and this first entry
// doesn't equal to our block, it might be before the first write ever. To check, we
// look at the previous entry and check if the key is the same.
// This check is worth it, the `cursor.prev()` check is rarely triggered (the if will
// short-circuit) and when it passes we save a full seek into the changeset/plain state
// table.
let is_before_first_write =
needs_prev_shard_check(rank, found_block, self.block_number) &&
!cursor.prev()?.is_some_and(|(key, _)| key_filter(&key));
Ok(HistoryInfo::from_lookup(
found_block,
is_before_first_write,
lowest_available_block_number,
))
} else if lowest_available_block_number.is_some() {
// The key may have been written, but due to pruning we may not have changesets and
// history, so we need to make a plain state lookup.
Ok(HistoryInfo::MaybeInPlainState)
} else {
// The key has not been written to at all.
Ok(HistoryInfo::NotYetWritten)
}
}
/// Set the lowest block number at which the account history is available.
pub const fn with_lowest_available_account_history_block_number(
mut self,
@@ -280,8 +248,14 @@ impl<Provider: DBProvider + BlockNumReader> HistoricalStateProviderRef<'_, Provi
}
}
impl<Provider: DBProvider + BlockNumReader + ChangeSetReader> AccountReader
for HistoricalStateProviderRef<'_, Provider>
impl<
Provider: DBProvider
+ BlockNumReader
+ ChangeSetReader
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider,
> AccountReader for HistoricalStateProviderRef<'_, Provider>
{
/// Get basic account information.
fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
@@ -436,8 +410,15 @@ impl<Provider> HashedPostStateProvider for HistoricalStateProviderRef<'_, Provid
}
}
impl<Provider: DBProvider + BlockNumReader + BlockHashReader + ChangeSetReader> StateProvider
for HistoricalStateProviderRef<'_, Provider>
impl<
Provider: DBProvider
+ BlockNumReader
+ BlockHashReader
+ ChangeSetReader
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider,
> StateProvider for HistoricalStateProviderRef<'_, Provider>
{
/// Get storage.
fn storage(
@@ -527,7 +508,7 @@ impl<Provider: DBProvider + ChangeSetReader + BlockNumReader> HistoricalStatePro
}
// Delegates all provider impls to [HistoricalStateProviderRef]
reth_storage_api::macros::delegate_provider_impls!(HistoricalStateProvider<Provider> where [Provider: DBProvider + BlockNumReader + BlockHashReader + ChangeSetReader]);
reth_storage_api::macros::delegate_provider_impls!(HistoricalStateProvider<Provider> where [Provider: DBProvider + BlockNumReader + BlockHashReader + ChangeSetReader + StorageSettingsCache + RocksDBProviderFactory + NodePrimitivesProvider]);
/// Lowest blocks at which different parts of the state are available.
/// They may be [Some] if pruning is enabled.
@@ -576,7 +557,8 @@ mod tests {
use crate::{
providers::state::historical::{HistoryInfo, LowestAvailableBlocks},
test_utils::create_test_provider_factory,
AccountReader, HistoricalStateProvider, HistoricalStateProviderRef, StateProvider,
AccountReader, HistoricalStateProvider, HistoricalStateProviderRef, RocksDBProviderFactory,
StateProvider,
};
use alloy_primitives::{address, b256, Address, B256, U256};
use reth_db_api::{
@@ -588,6 +570,7 @@ mod tests {
use reth_primitives_traits::{Account, StorageEntry};
use reth_storage_api::{
BlockHashReader, BlockNumReader, ChangeSetReader, DBProvider, DatabaseProviderFactory,
NodePrimitivesProvider, StorageSettingsCache,
};
use reth_storage_errors::provider::ProviderError;
@@ -599,7 +582,13 @@ mod tests {
const fn assert_state_provider<T: StateProvider>() {}
#[expect(dead_code)]
const fn assert_historical_state_provider<
T: DBProvider + BlockNumReader + BlockHashReader + ChangeSetReader,
T: DBProvider
+ BlockNumReader
+ BlockHashReader
+ ChangeSetReader
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider,
>() {
assert_state_provider::<HistoricalStateProvider<T>>();
}