feat(storage): add EitherReader for routing history queries to MDBX or RocksDB (#21063)

Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
This commit is contained in:
YK
2026-01-17 01:44:43 +08:00
committed by GitHub
parent 1be9fab5bf
commit 13c32625bc
6 changed files with 555 additions and 162 deletions

View File

@@ -16,7 +16,7 @@ use crate::{
HeaderSyncGapProvider, HistoricalStateProvider, HistoricalStateProviderRef, HistoryWriter,
LatestStateProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError,
PruneCheckpointReader, PruneCheckpointWriter, RawRocksDBBatch, RevertsInit, RocksBatchArg,
RocksDBProviderFactory, RocksTxRefArg, StageCheckpointReader, StateProviderBox, StateWriter,
RocksDBProviderFactory, StageCheckpointReader, StateProviderBox, StateWriter,
StaticFileProviderFactory, StatsReader, StorageReader, StorageTrieWriter, TransactionVariant,
TransactionsProvider, TransactionsProviderExt, TrieWriter,
};
@@ -889,25 +889,6 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
pub fn chain_spec(&self) -> &N::ChainSpec {
&self.chain_spec
}
/// Executes a closure with a `RocksDB` transaction for reading.
///
/// This helper encapsulates all the cfg-gated `RocksDB` transaction handling for reads.
fn with_rocksdb_tx<F, R>(&self, f: F) -> ProviderResult<R>
where
F: FnOnce(RocksTxRefArg<'_>) -> ProviderResult<R>,
{
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb = self.rocksdb_provider();
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_tx = rocksdb.tx();
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_tx_ref = &rocksdb_tx;
#[cfg(not(all(unix, feature = "rocksdb")))]
let rocksdb_tx_ref = ();
f(rocksdb_tx_ref)
}
}
impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {

View File

@@ -16,8 +16,8 @@ pub use static_file::{
mod state;
pub use state::{
historical::{
history_info, needs_prev_shard_check, HistoricalStateProvider, HistoricalStateProviderRef,
HistoryInfo, LowestAvailableBlocks,
compute_history_rank, history_info, needs_prev_shard_check, HistoricalStateProvider,
HistoricalStateProviderRef, HistoryInfo, LowestAvailableBlocks,
},
latest::{LatestStateProvider, LatestStateProviderRef},
overlay::{OverlayStateProvider, OverlayStateProviderFactory},

View File

@@ -164,16 +164,7 @@ impl RocksDBProvider {
self.prune_transaction_hash_numbers_in_range(provider, 0..=highest_tx)?;
}
(None, None) => {
// Both MDBX and static files are empty.
// If checkpoint says we should have data, that's an inconsistency.
if checkpoint > 0 {
tracing::warn!(
target: "reth::providers::rocksdb",
checkpoint,
"Checkpoint set but no transaction data exists, unwind needed"
);
return Ok(Some(0));
}
// Both MDBX and static files are empty, nothing to check.
}
}
@@ -263,16 +254,27 @@ impl RocksDBProvider {
}
// Find the max highest_block_number (excluding u64::MAX sentinel) across all
// entries
// entries. Also track if we found any non-sentinel entries.
let mut max_highest_block = 0u64;
let mut found_non_sentinel = false;
for result in self.iter::<tables::StoragesHistory>()? {
let (key, _) = result?;
let highest = key.sharded_key.highest_block_number;
if highest != u64::MAX && highest > max_highest_block {
max_highest_block = highest;
if highest != u64::MAX {
found_non_sentinel = true;
if highest > max_highest_block {
max_highest_block = highest;
}
}
}
// If all entries are sentinel entries (u64::MAX), treat as first-run scenario.
// This means no completed shards exist (only sentinel shards with
// highest_block_number=u64::MAX), so no actual history has been indexed.
if !found_non_sentinel {
return Ok(None);
}
// If any entry has highest_block > checkpoint, prune excess
if max_highest_block > checkpoint {
tracing::info!(
@@ -296,11 +298,7 @@ impl RocksDBProvider {
Ok(None)
}
None => {
// Empty RocksDB table
if checkpoint > 0 {
// Stage says we should have data but we don't
return Ok(Some(0));
}
// Empty RocksDB table, nothing to check.
Ok(None)
}
}
@@ -377,16 +375,27 @@ impl RocksDBProvider {
}
// Find the max highest_block_number (excluding u64::MAX sentinel) across all
// entries
// entries. Also track if we found any non-sentinel entries.
let mut max_highest_block = 0u64;
let mut found_non_sentinel = false;
for result in self.iter::<tables::AccountsHistory>()? {
let (key, _) = result?;
let highest = key.highest_block_number;
if highest != u64::MAX && highest > max_highest_block {
max_highest_block = highest;
if highest != u64::MAX {
found_non_sentinel = true;
if highest > max_highest_block {
max_highest_block = highest;
}
}
}
// If all entries are sentinel entries (u64::MAX), treat as first-run scenario.
// This means no completed shards exist (only sentinel shards with
// highest_block_number=u64::MAX), so no actual history has been indexed.
if !found_non_sentinel {
return Ok(None);
}
// If any entry has highest_block > checkpoint, prune excess
if max_highest_block > checkpoint {
tracing::info!(
@@ -413,11 +422,7 @@ impl RocksDBProvider {
Ok(None)
}
None => {
// Empty RocksDB table
if checkpoint > 0 {
// Stage says we should have data but we don't
return Ok(Some(0));
}
// Empty RocksDB table, nothing to check.
Ok(None)
}
}
@@ -542,7 +547,7 @@ mod tests {
}
#[test]
fn test_check_consistency_empty_rocksdb_with_checkpoint_needs_unwind() {
fn test_check_consistency_empty_rocksdb_with_checkpoint_is_first_run() {
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::TransactionHashNumbers>()
@@ -566,10 +571,10 @@ mod tests {
let provider = factory.database_provider_ro().unwrap();
// RocksDB is empty but checkpoint says block 100 was processed
// This means RocksDB is missing data and we need to unwind to rebuild
// RocksDB is empty but checkpoint says block 100 was processed.
// This is treated as a first-run/migration scenario - no unwind needed.
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(result, Some(0), "Should require unwind to block 0 to rebuild RocksDB");
assert_eq!(result, None, "Empty data with checkpoint is treated as first run");
}
#[test]
@@ -650,7 +655,7 @@ mod tests {
}
#[test]
fn test_check_consistency_storages_history_empty_with_checkpoint_needs_unwind() {
fn test_check_consistency_storages_history_empty_with_checkpoint_is_first_run() {
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::StoragesHistory>()
@@ -674,9 +679,10 @@ mod tests {
let provider = factory.database_provider_ro().unwrap();
// RocksDB is empty but checkpoint says block 100 was processed
// RocksDB is empty but checkpoint says block 100 was processed.
// This is treated as a first-run/migration scenario - no unwind needed.
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(result, Some(0), "Should require unwind to block 0 to rebuild StoragesHistory");
assert_eq!(result, None, "Empty RocksDB with checkpoint is treated as first run");
}
#[test]
@@ -978,6 +984,97 @@ mod tests {
);
}
#[test]
fn test_check_consistency_storages_history_sentinel_only_with_checkpoint_is_first_run() {
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::StoragesHistory>()
.build()
.unwrap();
// Insert ONLY sentinel entries (highest_block_number = u64::MAX)
// This simulates a scenario where history tracking started but no shards were completed
let key_sentinel_1 = StorageShardedKey::new(Address::ZERO, B256::ZERO, u64::MAX);
let key_sentinel_2 = StorageShardedKey::new(Address::random(), B256::random(), u64::MAX);
let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
rocksdb.put::<tables::StoragesHistory>(key_sentinel_1, &block_list).unwrap();
rocksdb.put::<tables::StoragesHistory>(key_sentinel_2, &block_list).unwrap();
// Verify entries exist (not empty table)
assert!(rocksdb.first::<tables::StoragesHistory>().unwrap().is_some());
// Create a test provider factory for MDBX
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
// Set a checkpoint indicating we should have processed up to block 100
{
let provider = factory.database_provider_rw().unwrap();
provider
.save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
.unwrap();
provider.commit().unwrap();
}
let provider = factory.database_provider_ro().unwrap();
// RocksDB has only sentinel entries (no completed shards) but checkpoint is set.
// This is treated as a first-run/migration scenario - no unwind needed.
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(
result, None,
"Sentinel-only entries with checkpoint should be treated as first run"
);
}
#[test]
fn test_check_consistency_accounts_history_sentinel_only_with_checkpoint_is_first_run() {
use reth_db_api::models::ShardedKey;
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::AccountsHistory>()
.build()
.unwrap();
// Insert ONLY sentinel entries (highest_block_number = u64::MAX)
let key_sentinel_1 = ShardedKey::new(Address::ZERO, u64::MAX);
let key_sentinel_2 = ShardedKey::new(Address::random(), u64::MAX);
let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
rocksdb.put::<tables::AccountsHistory>(key_sentinel_1, &block_list).unwrap();
rocksdb.put::<tables::AccountsHistory>(key_sentinel_2, &block_list).unwrap();
// Verify entries exist (not empty table)
assert!(rocksdb.first::<tables::AccountsHistory>().unwrap().is_some());
// Create a test provider factory for MDBX
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
// Set a checkpoint indicating we should have processed up to block 100
{
let provider = factory.database_provider_rw().unwrap();
provider
.save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
.unwrap();
provider.commit().unwrap();
}
let provider = factory.database_provider_ro().unwrap();
// RocksDB has only sentinel entries (no completed shards) but checkpoint is set.
// This is treated as a first-run/migration scenario - no unwind needed.
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(
result, None,
"Sentinel-only entries with checkpoint should be treated as first run"
);
}
#[test]
fn test_check_consistency_storages_history_behind_checkpoint_single_entry() {
use reth_db_api::models::storage_sharded_key::StorageShardedKey;
@@ -1135,7 +1232,7 @@ mod tests {
}
#[test]
fn test_check_consistency_accounts_history_empty_with_checkpoint_needs_unwind() {
fn test_check_consistency_accounts_history_empty_with_checkpoint_is_first_run() {
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::AccountsHistory>()
@@ -1159,9 +1256,10 @@ mod tests {
let provider = factory.database_provider_ro().unwrap();
// RocksDB is empty but checkpoint says block 100 was processed
// RocksDB is empty but checkpoint says block 100 was processed.
// This is treated as a first-run/migration scenario - no unwind needed.
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(result, Some(0), "Should require unwind to block 0 to rebuild AccountsHistory");
assert_eq!(result, None, "Empty RocksDB with checkpoint is treated as first run");
}
#[test]

View File

@@ -1,11 +1,15 @@
use super::metrics::{RocksDBMetrics, RocksDBOperation};
use crate::providers::{needs_prev_shard_check, HistoryInfo};
use crate::providers::{compute_history_rank, needs_prev_shard_check, HistoryInfo};
use alloy_consensus::transaction::TxHashRef;
use alloy_primitives::{Address, BlockNumber, TxNumber, B256};
use itertools::Itertools;
use parking_lot::Mutex;
use reth_chain_state::ExecutedBlock;
use reth_db_api::{
models::{storage_sharded_key::StorageShardedKey, ShardedKey, StorageSettings},
models::{
sharded_key::NUM_OF_INDICES_IN_SHARD, storage_sharded_key::StorageShardedKey, ShardedKey,
StorageSettings,
},
table::{Compress, Decode, Decompress, Encode, Table},
tables, BlockNumberList, DatabaseError,
};
@@ -592,10 +596,10 @@ impl RocksDBProvider {
account_history.entry(address).or_default().push(block_number);
}
}
for (address, blocks) in account_history {
let key = ShardedKey::new(address, u64::MAX);
let value = BlockNumberList::new_pre_sorted(blocks);
batch.put::<tables::AccountsHistory>(key, &value)?;
// Write account history using proper shard append logic
for (address, indices) in account_history {
batch.append_account_history_shard(address, indices)?;
}
ctx.pending_batches.lock().push(batch.into_inner());
Ok(())
@@ -620,10 +624,10 @@ impl RocksDBProvider {
}
}
}
for ((address, slot), blocks) in storage_history {
let key = StorageShardedKey::new(address, slot, u64::MAX);
let value = BlockNumberList::new_pre_sorted(blocks);
batch.put::<tables::StoragesHistory>(key, &value)?;
// Write storage history using proper shard append logic
for ((address, slot), indices) in storage_history {
batch.append_storage_history_shard(address, slot, indices)?;
}
ctx.pending_batches.lock().push(batch.into_inner());
Ok(())
@@ -714,6 +718,129 @@ impl<'a> RocksDBBatch<'a> {
pub fn into_inner(self) -> WriteBatchWithTransaction<true> {
self.inner
}
/// Appends indices to an account history shard with proper shard management.
///
/// Loads the existing shard (if any), appends new indices, and rechunks into
/// multiple shards if needed (respecting `NUM_OF_INDICES_IN_SHARD` limit).
///
/// # Requirements
///
/// - The `indices` MUST be strictly increasing and contain no duplicates.
/// - This method MUST only be called once per address per batch. The batch reads existing
/// shards from committed DB state, not from pending writes. Calling twice for the same
/// address will cause the second call to overwrite the first.
pub fn append_account_history_shard(
&mut self,
address: Address,
indices: impl IntoIterator<Item = u64>,
) -> ProviderResult<()> {
let indices: Vec<u64> = indices.into_iter().collect();
if indices.is_empty() {
return Ok(());
}
debug_assert!(
indices.windows(2).all(|w| w[0] < w[1]),
"indices must be strictly increasing: {:?}",
indices
);
let last_key = ShardedKey::new(address, u64::MAX);
let last_shard_opt = self.provider.get::<tables::AccountsHistory>(last_key.clone())?;
let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty);
last_shard.append(indices).map_err(ProviderError::other)?;
// Fast path: all indices fit in one shard
if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
self.put::<tables::AccountsHistory>(last_key, &last_shard)?;
return Ok(());
}
// Slow path: rechunk into multiple shards
let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
let mut chunks_peekable = chunks.into_iter().peekable();
while let Some(chunk) = chunks_peekable.next() {
let shard = BlockNumberList::new_pre_sorted(chunk);
let highest_block_number = if chunks_peekable.peek().is_some() {
shard.iter().next_back().expect("`chunks` does not return empty list")
} else {
u64::MAX
};
self.put::<tables::AccountsHistory>(
ShardedKey::new(address, highest_block_number),
&shard,
)?;
}
Ok(())
}
/// Appends indices to a storage history shard with proper shard management.
///
/// Loads the existing shard (if any), appends new indices, and rechunks into
/// multiple shards if needed (respecting `NUM_OF_INDICES_IN_SHARD` limit).
///
/// # Requirements
///
/// - The `indices` MUST be strictly increasing and contain no duplicates.
/// - This method MUST only be called once per (address, `storage_key`) pair per batch. The
/// batch reads existing shards from committed DB state, not from pending writes. Calling
/// twice for the same key will cause the second call to overwrite the first.
pub fn append_storage_history_shard(
&mut self,
address: Address,
storage_key: B256,
indices: impl IntoIterator<Item = u64>,
) -> ProviderResult<()> {
let indices: Vec<u64> = indices.into_iter().collect();
if indices.is_empty() {
return Ok(());
}
debug_assert!(
indices.windows(2).all(|w| w[0] < w[1]),
"indices must be strictly increasing: {:?}",
indices
);
let last_key = StorageShardedKey::last(address, storage_key);
let last_shard_opt = self.provider.get::<tables::StoragesHistory>(last_key.clone())?;
let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty);
last_shard.append(indices).map_err(ProviderError::other)?;
// Fast path: all indices fit in one shard
if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
self.put::<tables::StoragesHistory>(last_key, &last_shard)?;
return Ok(());
}
// Slow path: rechunk into multiple shards
let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
let mut chunks_peekable = chunks.into_iter().peekable();
while let Some(chunk) = chunks_peekable.next() {
let shard = BlockNumberList::new_pre_sorted(chunk);
let highest_block_number = if chunks_peekable.peek().is_some() {
shard.iter().next_back().expect("`chunks` does not return empty list")
} else {
u64::MAX
};
self.put::<tables::StoragesHistory>(
StorageShardedKey::new(address, storage_key, highest_block_number),
&shard,
)?;
}
Ok(())
}
}
/// `RocksDB` transaction wrapper providing MDBX-like semantics.
@@ -901,6 +1028,16 @@ impl<'db> RocksTx<'db> {
where
T: Table<Value = BlockNumberList>,
{
// History may be pruned if a lowest available block is set.
let is_maybe_pruned = lowest_available_block_number.is_some();
let fallback = || {
Ok(if is_maybe_pruned {
HistoryInfo::MaybeInPlainState
} else {
HistoryInfo::NotYetWritten
})
};
let cf = self.provider.0.db.cf_handle(T::NAME).ok_or_else(|| {
ProviderError::Database(DatabaseError::Other(format!(
"column family not found: {}",
@@ -918,53 +1055,28 @@ impl<'db> RocksTx<'db> {
if !iter.valid() {
// No shard found at or after target block.
return if lowest_available_block_number.is_some() {
// The key may have been written, but due to pruning we may not have changesets
// and history, so we need to make a plain state lookup.
Ok(HistoryInfo::MaybeInPlainState)
} else {
// The key has not been written to at all.
Ok(HistoryInfo::NotYetWritten)
};
//
// (MaybeInPlainState) The key may have been written, but due to pruning we may not have
// changesets and history, so we need to make a plain state lookup.
// (HistoryInfo::NotYetWritten) The key has not been written to at all.
return fallback();
}
// Check if the found key matches our target entity.
let Some(key_bytes) = iter.key() else {
return if lowest_available_block_number.is_some() {
Ok(HistoryInfo::MaybeInPlainState)
} else {
Ok(HistoryInfo::NotYetWritten)
};
return fallback();
};
if !key_matches(key_bytes)? {
// The found key is for a different entity.
return if lowest_available_block_number.is_some() {
Ok(HistoryInfo::MaybeInPlainState)
} else {
Ok(HistoryInfo::NotYetWritten)
};
return fallback();
}
// Decompress the block list for this shard.
let Some(value_bytes) = iter.value() else {
return if lowest_available_block_number.is_some() {
Ok(HistoryInfo::MaybeInPlainState)
} else {
Ok(HistoryInfo::NotYetWritten)
};
return fallback();
};
let chunk = BlockNumberList::decompress(value_bytes)?;
// Get the rank of the first entry before or equal to our block.
let mut rank = chunk.rank(block_number);
// Adjust the rank, so that we have the rank of the first entry strictly before our
// block (not equal to it).
if rank.checked_sub(1).and_then(|r| chunk.select(r)) == Some(block_number) {
rank -= 1;
}
let found_block = chunk.select(rank);
let (rank, found_block) = compute_history_rank(&chunk, block_number);
// Lazy check for previous shard - only called when needed.
// If we can step to a previous shard for this same key, history already exists,
@@ -1103,7 +1215,11 @@ mod tests {
use crate::providers::HistoryInfo;
use alloy_primitives::{Address, TxHash, B256};
use reth_db_api::{
models::{sharded_key::ShardedKey, storage_sharded_key::StorageShardedKey, IntegerList},
models::{
sharded_key::{ShardedKey, NUM_OF_INDICES_IN_SHARD},
storage_sharded_key::StorageShardedKey,
IntegerList,
},
table::Table,
tables,
};
@@ -1452,4 +1568,156 @@ mod tests {
tx.rollback().unwrap();
}
#[test]
fn test_account_history_shard_split_at_boundary() {
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let address = Address::from([0x42; 20]);
let limit = NUM_OF_INDICES_IN_SHARD;
// Add exactly NUM_OF_INDICES_IN_SHARD + 1 indices to trigger a split
let indices: Vec<u64> = (0..=(limit as u64)).collect();
let mut batch = provider.batch();
batch.append_account_history_shard(address, indices).unwrap();
batch.commit().unwrap();
// Should have 2 shards: one completed shard and one sentinel shard
let completed_key = ShardedKey::new(address, (limit - 1) as u64);
let sentinel_key = ShardedKey::new(address, u64::MAX);
let completed_shard = provider.get::<tables::AccountsHistory>(completed_key).unwrap();
let sentinel_shard = provider.get::<tables::AccountsHistory>(sentinel_key).unwrap();
assert!(completed_shard.is_some(), "completed shard should exist");
assert!(sentinel_shard.is_some(), "sentinel shard should exist");
let completed_shard = completed_shard.unwrap();
let sentinel_shard = sentinel_shard.unwrap();
assert_eq!(completed_shard.len(), limit as u64, "completed shard should be full");
assert_eq!(sentinel_shard.len(), 1, "sentinel shard should have 1 element");
}
#[test]
fn test_account_history_multiple_shard_splits() {
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let address = Address::from([0x43; 20]);
let limit = NUM_OF_INDICES_IN_SHARD;
// First batch: add NUM_OF_INDICES_IN_SHARD indices
let first_batch_indices: Vec<u64> = (0..limit as u64).collect();
let mut batch = provider.batch();
batch.append_account_history_shard(address, first_batch_indices).unwrap();
batch.commit().unwrap();
// Should have just a sentinel shard (exactly at limit, not over)
let sentinel_key = ShardedKey::new(address, u64::MAX);
let shard = provider.get::<tables::AccountsHistory>(sentinel_key.clone()).unwrap();
assert!(shard.is_some());
assert_eq!(shard.unwrap().len(), limit as u64);
// Second batch: add another NUM_OF_INDICES_IN_SHARD + 1 indices (causing 2 more shards)
let second_batch_indices: Vec<u64> = (limit as u64..=(2 * limit) as u64).collect();
let mut batch = provider.batch();
batch.append_account_history_shard(address, second_batch_indices).unwrap();
batch.commit().unwrap();
// Now we should have: 2 completed shards + 1 sentinel shard
let first_completed = ShardedKey::new(address, (limit - 1) as u64);
let second_completed = ShardedKey::new(address, (2 * limit - 1) as u64);
assert!(
provider.get::<tables::AccountsHistory>(first_completed).unwrap().is_some(),
"first completed shard should exist"
);
assert!(
provider.get::<tables::AccountsHistory>(second_completed).unwrap().is_some(),
"second completed shard should exist"
);
assert!(
provider.get::<tables::AccountsHistory>(sentinel_key).unwrap().is_some(),
"sentinel shard should exist"
);
}
#[test]
fn test_storage_history_shard_split_at_boundary() {
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let address = Address::from([0x44; 20]);
let slot = B256::from([0x55; 32]);
let limit = NUM_OF_INDICES_IN_SHARD;
// Add exactly NUM_OF_INDICES_IN_SHARD + 1 indices to trigger a split
let indices: Vec<u64> = (0..=(limit as u64)).collect();
let mut batch = provider.batch();
batch.append_storage_history_shard(address, slot, indices).unwrap();
batch.commit().unwrap();
// Should have 2 shards: one completed shard and one sentinel shard
let completed_key = StorageShardedKey::new(address, slot, (limit - 1) as u64);
let sentinel_key = StorageShardedKey::new(address, slot, u64::MAX);
let completed_shard = provider.get::<tables::StoragesHistory>(completed_key).unwrap();
let sentinel_shard = provider.get::<tables::StoragesHistory>(sentinel_key).unwrap();
assert!(completed_shard.is_some(), "completed shard should exist");
assert!(sentinel_shard.is_some(), "sentinel shard should exist");
let completed_shard = completed_shard.unwrap();
let sentinel_shard = sentinel_shard.unwrap();
assert_eq!(completed_shard.len(), limit as u64, "completed shard should be full");
assert_eq!(sentinel_shard.len(), 1, "sentinel shard should have 1 element");
}
#[test]
fn test_storage_history_multiple_shard_splits() {
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let address = Address::from([0x46; 20]);
let slot = B256::from([0x57; 32]);
let limit = NUM_OF_INDICES_IN_SHARD;
// First batch: add NUM_OF_INDICES_IN_SHARD indices
let first_batch_indices: Vec<u64> = (0..limit as u64).collect();
let mut batch = provider.batch();
batch.append_storage_history_shard(address, slot, first_batch_indices).unwrap();
batch.commit().unwrap();
// Should have just a sentinel shard (exactly at limit, not over)
let sentinel_key = StorageShardedKey::new(address, slot, u64::MAX);
let shard = provider.get::<tables::StoragesHistory>(sentinel_key.clone()).unwrap();
assert!(shard.is_some());
assert_eq!(shard.unwrap().len(), limit as u64);
// Second batch: add another NUM_OF_INDICES_IN_SHARD + 1 indices (causing 2 more shards)
let second_batch_indices: Vec<u64> = (limit as u64..=(2 * limit) as u64).collect();
let mut batch = provider.batch();
batch.append_storage_history_shard(address, slot, second_batch_indices).unwrap();
batch.commit().unwrap();
// Now we should have: 2 completed shards + 1 sentinel shard
let first_completed = StorageShardedKey::new(address, slot, (limit - 1) as u64);
let second_completed = StorageShardedKey::new(address, slot, (2 * limit - 1) as u64);
assert!(
provider.get::<tables::StoragesHistory>(first_completed).unwrap().is_some(),
"first completed shard should exist"
);
assert!(
provider.get::<tables::StoragesHistory>(second_completed).unwrap().is_some(),
"second completed shard should exist"
);
assert!(
provider.get::<tables::StoragesHistory>(sentinel_key).unwrap().is_some(),
"sentinel shard should exist"
);
}
}

View File

@@ -1,12 +1,11 @@
use crate::{
AccountReader, BlockHashReader, ChangeSetReader, HashedPostStateProvider, ProviderError,
StateProvider, StateRootProvider,
AccountReader, BlockHashReader, ChangeSetReader, EitherReader, HashedPostStateProvider,
ProviderError, RocksDBProviderFactory, StateProvider, StateRootProvider,
};
use alloy_eips::merge::EPOCH_SLOTS;
use alloy_primitives::{Address, BlockNumber, Bytes, StorageKey, StorageValue, B256};
use reth_db_api::{
cursor::{DbCursorRO, DbDupCursorRO},
models::{storage_sharded_key::StorageShardedKey, ShardedKey},
table::Table,
tables,
transaction::DbTx,
@@ -14,7 +13,8 @@ use reth_db_api::{
};
use reth_primitives_traits::{Account, Bytecode};
use reth_storage_api::{
BlockNumReader, BytecodeReader, DBProvider, StateProofProvider, StorageRootProvider,
BlockNumReader, BytecodeReader, DBProvider, NodePrimitivesProvider, StateProofProvider,
StorageRootProvider, StorageSettingsCache,
};
use reth_storage_errors::provider::ProviderResult;
use reth_trie::{
@@ -127,38 +127,47 @@ impl<'b, Provider: DBProvider + ChangeSetReader + BlockNumReader>
Self { provider, block_number, lowest_available_blocks }
}
/// Lookup an account in the `AccountsHistory` table
pub fn account_history_lookup(&self, address: Address) -> ProviderResult<HistoryInfo> {
/// Lookup an account in the `AccountsHistory` table using `EitherReader`.
pub fn account_history_lookup(&self, address: Address) -> ProviderResult<HistoryInfo>
where
Provider: StorageSettingsCache + RocksDBProviderFactory + NodePrimitivesProvider,
{
if !self.lowest_available_blocks.is_account_history_available(self.block_number) {
return Err(ProviderError::StateAtBlockPruned(self.block_number))
}
// history key to search IntegerList of block number changesets.
let history_key = ShardedKey::new(address, self.block_number);
self.history_info_lookup::<tables::AccountsHistory, _>(
history_key,
|key| key.key == address,
self.lowest_available_blocks.account_history_block_number,
)
self.provider.with_rocksdb_tx(|rocks_tx_ref| {
let mut reader = EitherReader::new_accounts_history(self.provider, rocks_tx_ref)?;
reader.account_history_info(
address,
self.block_number,
self.lowest_available_blocks.account_history_block_number,
)
})
}
/// Lookup a storage key in the `StoragesHistory` table
/// Lookup a storage key in the `StoragesHistory` table using `EitherReader`.
pub fn storage_history_lookup(
&self,
address: Address,
storage_key: StorageKey,
) -> ProviderResult<HistoryInfo> {
) -> ProviderResult<HistoryInfo>
where
Provider: StorageSettingsCache + RocksDBProviderFactory + NodePrimitivesProvider,
{
if !self.lowest_available_blocks.is_storage_history_available(self.block_number) {
return Err(ProviderError::StateAtBlockPruned(self.block_number))
}
// history key to search IntegerList of block number changesets.
let history_key = StorageShardedKey::new(address, storage_key, self.block_number);
self.history_info_lookup::<tables::StoragesHistory, _>(
history_key,
|key| key.address == address && key.sharded_key.key == storage_key,
self.lowest_available_blocks.storage_history_block_number,
)
self.provider.with_rocksdb_tx(|rocks_tx_ref| {
let mut reader = EitherReader::new_storages_history(self.provider, rocks_tx_ref)?;
reader.storage_history_info(
address,
storage_key,
self.block_number,
self.lowest_available_blocks.storage_history_block_number,
)
})
}
/// Checks and returns `true` if distance to historical block exceeds the provided limit.
@@ -204,25 +213,6 @@ impl<'b, Provider: DBProvider + ChangeSetReader + BlockNumReader>
Ok(HashedStorage::from_reverts(self.tx(), address, self.block_number)?)
}
fn history_info_lookup<T, K>(
&self,
key: K,
key_filter: impl Fn(&K) -> bool,
lowest_available_block_number: Option<BlockNumber>,
) -> ProviderResult<HistoryInfo>
where
T: Table<Key = K, Value = BlockNumberList>,
{
let mut cursor = self.tx().cursor_read::<T>()?;
history_info::<T, K, _>(
&mut cursor,
key,
self.block_number,
key_filter,
lowest_available_block_number,
)
}
/// Set the lowest block number at which the account history is available.
pub const fn with_lowest_available_account_history_block_number(
mut self,
@@ -248,8 +238,14 @@ impl<Provider: DBProvider + BlockNumReader> HistoricalStateProviderRef<'_, Provi
}
}
impl<Provider: DBProvider + BlockNumReader + ChangeSetReader> AccountReader
for HistoricalStateProviderRef<'_, Provider>
impl<
Provider: DBProvider
+ BlockNumReader
+ ChangeSetReader
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider,
> AccountReader for HistoricalStateProviderRef<'_, Provider>
{
/// Get basic account information.
fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
@@ -404,8 +400,15 @@ impl<Provider> HashedPostStateProvider for HistoricalStateProviderRef<'_, Provid
}
}
impl<Provider: DBProvider + BlockNumReader + BlockHashReader + ChangeSetReader> StateProvider
for HistoricalStateProviderRef<'_, Provider>
impl<
Provider: DBProvider
+ BlockNumReader
+ BlockHashReader
+ ChangeSetReader
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider,
> StateProvider for HistoricalStateProviderRef<'_, Provider>
{
/// Get storage.
fn storage(
@@ -495,7 +498,7 @@ impl<Provider: DBProvider + ChangeSetReader + BlockNumReader> HistoricalStatePro
}
// Delegates all provider impls to [HistoricalStateProviderRef]
reth_storage_api::macros::delegate_provider_impls!(HistoricalStateProvider<Provider> where [Provider: DBProvider + BlockNumReader + BlockHashReader + ChangeSetReader]);
reth_storage_api::macros::delegate_provider_impls!(HistoricalStateProvider<Provider> where [Provider: DBProvider + BlockNumReader + BlockHashReader + ChangeSetReader + StorageSettingsCache + RocksDBProviderFactory + NodePrimitivesProvider]);
/// Lowest blocks at which different parts of the state are available.
/// They may be [Some] if pruning is enabled.
@@ -525,6 +528,32 @@ impl LowestAvailableBlocks {
}
}
/// Computes the rank and finds the next modification block in a history shard.
///
/// Given a `block_number`, this function returns:
/// - `rank`: The number of entries strictly before `block_number` in the shard
/// - `found_block`: The block number at position `rank` (i.e., the first block >= `block_number`
/// where a modification occurred), or `None` if `rank` is out of bounds
///
/// The rank is adjusted when `block_number` exactly matches an entry in the shard,
/// so that `found_block` always returns the modification at or after the target.
///
/// This logic is shared between MDBX cursor-based lookups and `RocksDB` iterator lookups.
#[inline]
pub fn compute_history_rank(
chunk: &reth_db_api::BlockNumberList,
block_number: BlockNumber,
) -> (u64, Option<u64>) {
let mut rank = chunk.rank(block_number);
// `rank(block_number)` returns count of entries <= block_number.
// We want the first entry >= block_number, so if block_number is in the shard,
// we need to step back one position to point at it (not past it).
if rank.checked_sub(1).and_then(|r| chunk.select(r)) == Some(block_number) {
rank -= 1;
}
(rank, chunk.select(rank))
}
/// Checks if a previous shard lookup is needed to determine if we're before the first write.
///
/// Returns `true` when `rank == 0` (first entry in shard) and the found block doesn't match
@@ -557,16 +586,7 @@ where
// index, the first chunk for the next key will be returned so we filter out chunks that
// have a different key.
if let Some(chunk) = cursor.seek(key)?.filter(|(k, _)| key_filter(k)).map(|x| x.1) {
// Get the rank of the first entry before or equal to our block.
let mut rank = chunk.rank(block_number);
// Adjust the rank, so that we have the rank of the first entry strictly before our
// block (not equal to it).
if rank.checked_sub(1).and_then(|r| chunk.select(r)) == Some(block_number) {
rank -= 1;
}
let found_block = chunk.select(rank);
let (rank, found_block) = compute_history_rank(&chunk, block_number);
// If our block is before the first entry in the index chunk and this first entry
// doesn't equal to our block, it might be before the first write ever. To check, we
@@ -598,7 +618,8 @@ mod tests {
use crate::{
providers::state::historical::{HistoryInfo, LowestAvailableBlocks},
test_utils::create_test_provider_factory,
AccountReader, HistoricalStateProvider, HistoricalStateProviderRef, StateProvider,
AccountReader, HistoricalStateProvider, HistoricalStateProviderRef, RocksDBProviderFactory,
StateProvider,
};
use alloy_primitives::{address, b256, Address, B256, U256};
use reth_db_api::{
@@ -610,6 +631,7 @@ mod tests {
use reth_primitives_traits::{Account, StorageEntry};
use reth_storage_api::{
BlockHashReader, BlockNumReader, ChangeSetReader, DBProvider, DatabaseProviderFactory,
NodePrimitivesProvider, StorageSettingsCache,
};
use reth_storage_errors::provider::ProviderError;
@@ -621,7 +643,13 @@ mod tests {
const fn assert_state_provider<T: StateProvider>() {}
#[expect(dead_code)]
const fn assert_historical_state_provider<
T: DBProvider + BlockNumReader + BlockHashReader + ChangeSetReader,
T: DBProvider
+ BlockNumReader
+ BlockHashReader
+ ChangeSetReader
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider,
>() {
assert_state_provider::<HistoricalStateProvider<T>>();
}

View File

@@ -1,4 +1,5 @@
use crate::providers::RocksDBProvider;
use crate::{either_writer::RocksTxRefArg, providers::RocksDBProvider};
use reth_storage_errors::provider::ProviderResult;
/// `RocksDB` provider factory.
///
@@ -13,4 +14,21 @@ pub trait RocksDBProviderFactory {
/// commits, ensuring atomicity across all storage backends.
#[cfg(all(unix, feature = "rocksdb"))]
fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>);
/// Executes a closure with a `RocksDB` transaction for reading.
///
/// This helper encapsulates all the cfg-gated `RocksDB` transaction handling for reads.
fn with_rocksdb_tx<F, R>(&self, f: F) -> ProviderResult<R>
where
F: FnOnce(RocksTxRefArg<'_>) -> ProviderResult<R>,
{
#[cfg(all(unix, feature = "rocksdb"))]
{
let rocksdb = self.rocksdb_provider();
let tx = rocksdb.tx();
f(&tx)
}
#[cfg(not(all(unix, feature = "rocksdb")))]
f(())
}
}