Compare commits

..

5 Commits

Author SHA1 Message Date
yongkangc
6be57ce9e1 fix: address review nits 2026-01-22 12:26:42 +00:00
Georgios Konstantopoulos
b997bee71b fix: add backticks to remaining RocksDB in doc comments 2026-01-22 12:08:21 +00:00
Georgios Konstantopoulos
e3c7413747 fix: clippy warnings and add missing import for rocksdb provider 2026-01-22 12:08:19 +00:00
Georgios Konstantopoulos
0bf2335c8c fix(rocksdb): add assume_history_complete flag to fix test semantics
When testing RocksDB with identical data to MDBX, both backends should
return the same results. The MaybeInPlainState fallback (for hybrid
storage safety) was causing test failures because MDBX returns
NotYetWritten when querying before first history entry.

Added assume_history_complete flag to RocksTx that:
- When false (default): returns MaybeInPlainState for hybrid storage
- When true: returns NotYetWritten to match MDBX semantics for tests

This preserves the correct hybrid storage behavior in production while
allowing tests with identical data to verify semantic equivalence.
2026-01-22 12:08:16 +00:00
yongkangc
1f7a5acae1 fix(rocksdb): correct history index divergence causing nonce errors after unwind
Fixes RocksDB history indices diverging from MDBX after unwind operations,
causing 'max nonce mismatch' errors during block replay.

Root causes:
1. write_account_history/write_storage_history included all accounts/slots
   instead of only those with actual changes (account-info or storage value)
2. history_info in RocksTx returned NotYetWritten for missing entries instead
   of MaybeInPlainState, incorrectly treating pre-RocksDB accounts as new
3. History writes accumulated during save_blocks calls without deferred commit
4. Sentinel shards (highest_block_number=u64::MAX) not properly handled in
   invariant checks

Changes:
- Filter account history to only info-changed or destroyed accounts
- Filter storage history to only actually-changed slots
- Always return MaybeInPlainState from history_info for missing entries
- Add PendingHistoryWrites to accumulate history across save_blocks calls
- Commit pending history after MDBX (not before) via commit_pending_history
- Check sentinel shard contents when validating/pruning history
2026-01-22 12:08:14 +00:00
6 changed files with 214 additions and 146 deletions

View File

@@ -1351,7 +1351,8 @@ mod rocksdb_tests {
// Run queries against both backends using EitherReader
let mdbx_ro = factory.database_provider_ro().unwrap();
let rocks_tx = rocks_provider.tx();
// Use `with_assume_history_complete()` since both backends have identical data
let rocks_tx = rocks_provider.tx().with_assume_history_complete();
for (i, query) in queries.iter().enumerate() {
// MDBX query via EitherReader
@@ -1443,7 +1444,8 @@ mod rocksdb_tests {
// Run queries against both backends using EitherReader
let mdbx_ro = factory.database_provider_ro().unwrap();
let rocks_tx = rocks_provider.tx();
// Use `with_assume_history_complete()` since both backends have identical data
let rocks_tx = rocks_provider.tx().with_assume_history_complete();
for (i, query) in queries.iter().enumerate() {
// MDBX query via EitherReader

View File

@@ -2,7 +2,7 @@ use crate::{
changesets_utils::StorageRevertsIter,
providers::{
database::{chain::ChainStorage, metrics},
rocksdb::{PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
rocksdb::{PendingHistory, PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
static_file::{StaticFileWriteCtx, StaticFileWriter},
NodeTypesForProvider, StaticFileProvider,
},
@@ -186,6 +186,10 @@ pub struct DatabaseProvider<TX, N: NodeTypes> {
/// Pending `RocksDB` batches to be committed at provider commit time.
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
pending_rocksdb_batches: PendingRocksDBBatches,
/// Pending history writes accumulated across `save_blocks()` calls.
/// Materialized into `RocksDB` shards at commit time after MDBX commits.
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
pending_rocksdb_history: PendingHistory,
/// Minimum distance from tip required for pruning
minimum_pruning_distance: u64,
/// Database provider metrics
@@ -204,6 +208,7 @@ impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
.field("rocksdb_provider", &self.rocksdb_provider)
.field("changeset_cache", &self.changeset_cache)
.field("pending_rocksdb_batches", &"<pending batches>")
.field("pending_rocksdb_history", &"<pending history>")
.field("minimum_pruning_distance", &self.minimum_pruning_distance)
.finish()
}
@@ -337,6 +342,7 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
rocksdb_provider,
changeset_cache,
pending_rocksdb_batches: Default::default(),
pending_rocksdb_history: Default::default(),
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
metrics: metrics::DatabaseProviderMetrics::default(),
}
@@ -410,6 +416,7 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
prune_tx_lookup: self.prune_modes.transaction_lookup,
storage_settings: self.cached_storage_settings(),
pending_batches: self.pending_rocksdb_batches.clone(),
pending_history: self.pending_rocksdb_history.clone(),
}
}
@@ -876,6 +883,7 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
rocksdb_provider,
changeset_cache,
pending_rocksdb_batches: Default::default(),
pending_rocksdb_history: Default::default(),
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
metrics: metrics::DatabaseProviderMetrics::default(),
}
@@ -3478,6 +3486,19 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
}
/// Commit database transaction, static files, and pending `RocksDB` batches.
///
/// # Commit Order (Critical for Correctness)
///
/// `RocksDB` history indices must commit AFTER MDBX changesets to prevent a race
/// condition where history indices point to non-existent changesets:
///
/// 1. Static files finalize (headers, transactions, receipts)
/// 2. MDBX commits (changesets become visible)
/// 3. `RocksDB` batches commit (non-history data like tx hashes)
/// 4. `RocksDB` history commits (account/storage history indices)
///
/// This ordering ensures that when a reader follows a `RocksDB` history index
/// to a changeset, the changeset exists in MDBX.
fn commit(self) -> ProviderResult<()> {
// For unwinding it makes more sense to commit the database first, since if
// it is interrupted before the static files commit, we can just
@@ -3488,10 +3509,15 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
#[cfg(all(unix, feature = "rocksdb"))]
{
// Commit non-history batches
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
for batch in batches {
self.rocksdb_provider.commit_batch(batch)?;
}
// Commit pending history (after MDBX, so changesets exist)
let history = std::mem::take(&mut *self.pending_rocksdb_history.lock());
self.rocksdb_provider.commit_pending_history(history)?;
}
self.static_file_provider.commit()?;
@@ -3503,20 +3529,30 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
self.static_file_provider.finalize()?;
timings.sf = start.elapsed();
// CRITICAL: MDBX must commit BEFORE RocksDB history indices.
// This prevents a race where history says "look at changeset N" but
// the changeset doesn't exist yet.
let start = Instant::now();
self.tx.commit()?;
timings.mdbx = start.elapsed();
#[cfg(all(unix, feature = "rocksdb"))]
{
let start = Instant::now();
// Commit non-history batches (tx hashes, etc.)
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
for batch in batches {
self.rocksdb_provider.commit_batch(batch)?;
}
// Commit pending history (after MDBX, so changesets exist)
let history = std::mem::take(&mut *self.pending_rocksdb_history.lock());
self.rocksdb_provider.commit_pending_history(history)?;
timings.rocksdb = start.elapsed();
}
let start = Instant::now();
self.tx.commit()?;
timings.mdbx = start.elapsed();
self.metrics.record_commit(&timings);
}

View File

@@ -262,7 +262,7 @@ impl RocksDBProvider {
let (key, value) = result?;
let highest = key.sharded_key.highest_block_number;
if highest == u64::MAX {
if let Some(max_in_shard) = value.max() &&
if let Some(max_in_shard) = value.iter().max() &&
max_in_shard > max_sentinel_block
{
max_sentinel_block = max_in_shard;
@@ -416,7 +416,7 @@ impl RocksDBProvider {
let highest = key.highest_block_number;
if highest == u64::MAX {
// Sentinel shard: check the actual max block number in the shard
if let Some(max_in_shard) = value.max() &&
if let Some(max_in_shard) = value.iter().max() &&
max_in_shard > max_sentinel_block
{
max_sentinel_block = max_in_shard;
@@ -1064,7 +1064,7 @@ mod tests {
// This simulates a scenario where history tracking started but no shards were completed
let key_sentinel_1 = StorageShardedKey::new(Address::ZERO, B256::ZERO, u64::MAX);
let key_sentinel_2 = StorageShardedKey::new(Address::random(), B256::random(), u64::MAX);
// Use a checkpoint that matches the sentinel shard contents (max block = 30)
// Sentinel shards contain blocks [10, 20, 30], so max block in shard = 30
let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
rocksdb.put::<tables::StoragesHistory>(key_sentinel_1, &block_list).unwrap();
rocksdb.put::<tables::StoragesHistory>(key_sentinel_2, &block_list).unwrap();
@@ -1078,8 +1078,8 @@ mod tests {
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
// Set a checkpoint that matches the sentinel shard data (max block = 30)
// This is a normal scenario where sentinel shards exist and are in sync with checkpoint
// Checkpoint = 30 matches sentinel shard max block, simulating "consistent" state
// where indexing is complete through the checkpoint block
{
let provider = factory.database_provider_rw().unwrap();
provider
@@ -1108,7 +1108,7 @@ mod tests {
// Insert ONLY sentinel entries (highest_block_number = u64::MAX)
let key_sentinel_1 = ShardedKey::new(Address::ZERO, u64::MAX);
let key_sentinel_2 = ShardedKey::new(Address::random(), u64::MAX);
// Use a checkpoint that matches the sentinel shard contents (max block = 30)
// Sentinel shards contain blocks [10, 20, 30], so max block in shard = 30
let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
rocksdb.put::<tables::AccountsHistory>(key_sentinel_1, &block_list).unwrap();
rocksdb.put::<tables::AccountsHistory>(key_sentinel_2, &block_list).unwrap();
@@ -1122,8 +1122,8 @@ mod tests {
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
// Set a checkpoint that matches the sentinel shard data (max block = 30)
// This is a normal scenario where sentinel shards exist and are in sync with checkpoint
// Checkpoint = 30 matches sentinel shard max block, simulating "consistent" state
// where indexing is complete through the checkpoint block
{
let provider = factory.database_provider_rw().unwrap();
provider
@@ -1466,101 +1466,4 @@ mod tests {
"Should require unwind to block 50 to rebuild AccountsHistory"
);
}
#[test]
fn test_prune_storages_history_sentinel_shard_rewrite() {
use reth_db_api::models::storage_sharded_key::StorageShardedKey;
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::StoragesHistory>()
.build()
.unwrap();
let address = Address::random();
let storage_key = B256::random();
// Create a sentinel shard with blocks both above and below checkpoint 100
// Contains [10, 50, 150] - after pruning at checkpoint 100, should become [10, 50]
let sentinel_key = StorageShardedKey::new(address, storage_key, u64::MAX);
let block_list = BlockNumberList::new_pre_sorted([10, 50, 150]);
rocksdb.put::<tables::StoragesHistory>(sentinel_key.clone(), &block_list).unwrap();
// Create a test provider factory for MDBX
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
// Set checkpoint to block 100
{
let provider = factory.database_provider_rw().unwrap();
provider
.save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
.unwrap();
provider.commit().unwrap();
}
let provider = factory.database_provider_ro().unwrap();
// Run consistency check - should prune the sentinel shard
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(result, None, "Should heal by pruning, no unwind needed");
// Verify the sentinel shard was rewritten (not deleted) with only [10, 50]
let rewritten = rocksdb.get::<tables::StoragesHistory>(sentinel_key).unwrap();
assert!(rewritten.is_some(), "Sentinel shard should be rewritten, not deleted");
let rewritten_list = rewritten.unwrap();
let blocks: Vec<u64> = rewritten_list.iter().collect();
assert_eq!(blocks, vec![10, 50], "Sentinel shard should contain only blocks <= 100");
}
#[test]
fn test_prune_accounts_history_sentinel_shard_rewrite() {
use reth_db_api::models::ShardedKey;
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::AccountsHistory>()
.build()
.unwrap();
let address = Address::random();
// Create a sentinel shard with blocks both above and below checkpoint 100
// Contains [10, 50, 150] - after pruning at checkpoint 100, should become [10, 50]
let sentinel_key = ShardedKey::new(address, u64::MAX);
let block_list = BlockNumberList::new_pre_sorted([10, 50, 150]);
rocksdb.put::<tables::AccountsHistory>(sentinel_key.clone(), &block_list).unwrap();
// Create a test provider factory for MDBX
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
// Set checkpoint to block 100
{
let provider = factory.database_provider_rw().unwrap();
provider
.save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
.unwrap();
provider.commit().unwrap();
}
let provider = factory.database_provider_ro().unwrap();
// Run consistency check - should prune the sentinel shard
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(result, None, "Should heal by pruning, no unwind needed");
// Verify the sentinel shard was rewritten (not deleted) with only [10, 50]
let rewritten = rocksdb.get::<tables::AccountsHistory>(sentinel_key).unwrap();
assert!(rewritten.is_some(), "Sentinel shard should be rewritten, not deleted");
let rewritten_list = rewritten.unwrap();
let blocks: Vec<u64> = rewritten_list.iter().collect();
assert_eq!(blocks, vec![10, 50], "Sentinel shard should contain only blocks <= 100");
}
}

View File

@@ -4,7 +4,10 @@ mod invariants;
mod metrics;
mod provider;
pub(crate) use provider::{PendingRocksDBBatches, RocksDBWriteCtx};
#[allow(unused_imports)]
pub(crate) use provider::{
PendingHistory, PendingHistoryWrites, PendingRocksDBBatches, RocksDBWriteCtx,
};
pub use provider::{
RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksDBRawIter, RocksDBTableStats, RocksTx,
};

View File

@@ -15,7 +15,7 @@ use reth_db_api::{
table::{Compress, Decode, Decompress, Encode, Table},
tables, BlockNumberList, DatabaseError,
};
use reth_primitives_traits::BlockBody as _;
use reth_primitives_traits::{AlloyBlockHeader as _, BlockBody as _};
use reth_prune_types::PruneMode;
use reth_storage_errors::{
db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel},
@@ -57,6 +57,24 @@ pub struct RocksDBTableStats {
pub pending_compaction_bytes: u64,
}
/// Pending history writes that are accumulated across multiple `save_blocks()` calls
/// and materialized into `RocksDB` shards at commit time.
///
/// This exists because [`RocksDBBatch::append_account_history_shard`] and
/// [`RocksDBBatch::append_storage_history_shard`] read from committed state, not pending writes.
/// By accumulating all history deltas in memory and materializing once at commit time, we ensure
/// each key is written exactly once with all its indices.
#[derive(Debug, Default)]
pub(crate) struct PendingHistoryWrites {
/// Account history: address -> list of block numbers where the account was modified.
pub accounts: BTreeMap<Address, Vec<u64>>,
/// Storage history: (address, slot) -> list of block numbers where the storage was modified.
pub storages: BTreeMap<(Address, B256), Vec<u64>>,
}
/// Pending history writes type alias.
pub(crate) type PendingHistory = Arc<Mutex<PendingHistoryWrites>>;
/// Context for `RocksDB` block writes.
#[derive(Clone)]
pub(crate) struct RocksDBWriteCtx {
@@ -68,6 +86,9 @@ pub(crate) struct RocksDBWriteCtx {
pub storage_settings: StorageSettings,
/// Pending batches to push to after writing.
pub pending_batches: PendingRocksDBBatches,
/// Pending history writes accumulated across `save_blocks()` calls.
/// Materialized into `RocksDB` shards at commit time.
pub pending_history: PendingHistory,
}
impl fmt::Debug for RocksDBWriteCtx {
@@ -77,6 +98,7 @@ impl fmt::Debug for RocksDBWriteCtx {
.field("prune_tx_lookup", &self.prune_tx_lookup)
.field("storage_settings", &self.storage_settings)
.field("pending_batches", &"<pending batches>")
.field("pending_history", &"<pending history>")
.finish()
}
}
@@ -594,7 +616,7 @@ impl RocksDBProvider {
let write_options = WriteOptions::default();
let txn_options = OptimisticTransactionOptions::default();
let inner = self.0.db_rw().transaction_opt(&write_options, &txn_options);
RocksTx { inner, provider: self }
RocksTx { inner, provider: self, assume_history_complete: false }
}
/// Creates a new batch for atomic writes.
@@ -1057,56 +1079,109 @@ impl RocksDBProvider {
Ok(())
}
/// Writes account history indices for the given blocks.
/// Accumulates account history indices for the given blocks.
///
/// Indices are accumulated in `ctx.pending_history` and materialized at commit time
/// by [`RocksDBProvider::commit_pending_history`]. This ensures each key is written
/// exactly once, working around `RocksDB` batch reads not seeing uncommitted writes.
///
/// Only accounts with info changes (nonce, balance, `code_hash`) or lifecycle changes
/// (created/destroyed) are included, matching MDBX `AccountChangeSets` behavior.
#[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
fn write_account_history<N: reth_node_types::NodePrimitives>(
&self,
blocks: &[ExecutedBlock<N>],
ctx: &RocksDBWriteCtx,
) -> ProviderResult<()> {
let mut batch = self.batch();
let mut account_history: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
for (block_idx, block) in blocks.iter().enumerate() {
let block_number = ctx.first_block_number + block_idx as u64;
let mut local: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
for block in blocks {
let block_number = block.recovered_block().number();
let bundle = &block.execution_outcome().state;
for &address in bundle.state().keys() {
account_history.entry(address).or_default().push(block_number);
for (&address, account) in bundle.state() {
let info_changed = account.is_info_changed();
let was_destroyed = account.was_destroyed();
if info_changed || was_destroyed {
local.entry(address).or_default().push(block_number);
}
}
}
// Write account history using proper shard append logic
for (address, indices) in account_history {
batch.append_account_history_shard(address, indices)?;
let mut pending = ctx.pending_history.lock();
for (address, mut indices) in local {
pending.accounts.entry(address).or_default().append(&mut indices);
}
ctx.pending_batches.lock().push(batch.into_inner());
Ok(())
}
/// Writes storage history indices for the given blocks.
/// Accumulates storage history indices for the given blocks.
///
/// Indices are accumulated in `ctx.pending_history` and materialized at commit time
/// by [`RocksDBProvider::commit_pending_history`]. This ensures each key is written
/// exactly once, working around `RocksDB` batch reads not seeing uncommitted writes.
///
/// Only storage slots with actual value changes are included, matching MDBX behavior.
#[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
fn write_storage_history<N: reth_node_types::NodePrimitives>(
&self,
blocks: &[ExecutedBlock<N>],
ctx: &RocksDBWriteCtx,
) -> ProviderResult<()> {
let mut batch = self.batch();
let mut storage_history: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
for (block_idx, block) in blocks.iter().enumerate() {
let block_number = ctx.first_block_number + block_idx as u64;
let mut local: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
for block in blocks {
let block_number = block.recovered_block().number();
let bundle = &block.execution_outcome().state;
for (&address, account) in bundle.state() {
for &slot in account.storage.keys() {
let key = B256::new(slot.to_be_bytes());
storage_history.entry((address, key)).or_default().push(block_number);
for (&slot, storage_slot) in &account.storage {
if storage_slot.is_changed() {
let key = B256::new(slot.to_be_bytes());
local.entry((address, key)).or_default().push(block_number);
}
}
}
}
// Write storage history using proper shard append logic
for ((address, slot), indices) in storage_history {
let mut pending = ctx.pending_history.lock();
for ((address, slot), mut indices) in local {
pending.storages.entry((address, slot)).or_default().append(&mut indices);
}
Ok(())
}
/// Materializes pending history writes into `RocksDB` shards and commits them.
///
/// This is called at provider commit time, AFTER MDBX has committed. Each key is
/// written exactly once, ensuring [`RocksDBBatch::append_account_history_shard`] and
/// [`RocksDBBatch::append_storage_history_shard`] see committed state.
pub(crate) fn commit_pending_history(
&self,
history: PendingHistoryWrites,
) -> ProviderResult<()> {
if history.accounts.is_empty() && history.storages.is_empty() {
return Ok(());
}
let mut batch = self.batch();
// Write account history shards
for (address, mut indices) in history.accounts {
// Sort and dedup in case of duplicate block numbers from multiple save_blocks calls
indices.sort_unstable();
indices.dedup();
batch.append_account_history_shard(address, indices)?;
}
// Write storage history shards
for ((address, slot), mut indices) in history.storages {
indices.sort_unstable();
indices.dedup();
batch.append_storage_history_shard(address, slot, indices)?;
}
ctx.pending_batches.lock().push(batch.into_inner());
if !batch.is_empty() {
batch.commit()?;
}
Ok(())
}
}
@@ -1521,10 +1596,14 @@ impl<'a> RocksDBBatch<'a> {
/// - Iteration over uncommitted data
///
/// Note: `Transaction` is `Send` but NOT `Sync`. This wrapper does not implement
/// `DbTx`/`DbTxMut` traits directly; use RocksDB-specific methods instead.
/// `DbTx`/`DbTxMut` traits directly; use `RocksDB`-specific methods instead.
pub struct RocksTx<'db> {
inner: Transaction<'db, OptimisticTransactionDB>,
provider: &'db RocksDBProvider,
/// When true, assume `RocksDB` has complete history (like MDBX) and return `NotYetWritten`
/// when querying before the first history entry. When false (default), return
/// `MaybeInPlainState` for hybrid storage safety.
assume_history_complete: bool,
}
impl fmt::Debug for RocksTx<'_> {
@@ -1534,6 +1613,20 @@ impl fmt::Debug for RocksTx<'_> {
}
impl<'db> RocksTx<'db> {
/// Sets the `assume_history_complete` flag to true.
///
/// When enabled, history queries will return `NotYetWritten` (like MDBX) instead of
/// `MaybeInPlainState` when querying before the first history entry.
///
/// WARNING: Only use in tests where `RocksDB` has complete history (identical to MDBX).
/// In production, `RocksDB` may have partial history, so this flag would cause incorrect
/// behavior (treating accounts as non-existent when they exist in MDBX).
#[cfg(test)]
pub const fn with_assume_history_complete(mut self) -> Self {
self.assume_history_complete = true;
self
}
/// Gets a value from the specified table. Sees uncommitted writes in this transaction.
pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
let encoded_key = key.encode();
@@ -1700,13 +1793,19 @@ impl<'db> RocksTx<'db> {
where
T: Table<Value = BlockNumberList>,
{
// History may be pruned if a lowest available block is set.
let is_maybe_pruned = lowest_available_block_number.is_some();
// When RocksDB can't find history for an address, we must fall back to plain state.
//
// This is critical for hybrid storage: RocksDB only contains history indices for blocks
// written AFTER it was enabled. Accounts modified before that exist only in MDBX.
// Returning `NotYetWritten` would incorrectly treat them as non-existent (nonce 0).
//
// When `assume_history_complete` is true (for tests with identical data), return
// `NotYetWritten` to match MDBX semantics.
let fallback = || {
Ok(if is_maybe_pruned {
HistoryInfo::MaybeInPlainState
} else {
Ok(if self.assume_history_complete {
HistoryInfo::NotYetWritten
} else {
HistoryInfo::MaybeInPlainState
})
};
@@ -1757,11 +1856,27 @@ impl<'db> RocksTx<'db> {
false
};
Ok(HistoryInfo::from_lookup(
// For RocksDB, we must handle `is_before_first_write` specially.
//
// When RocksDB says the target block is before the first recorded write:
// - MDBX would return `NotYetWritten` (account doesn't exist) because MDBX has full history
// - RocksDB should return `MaybeInPlainState` because the account may exist in MDBX from
// before RocksDB was enabled
//
// We use `from_lookup` but then override `NotYetWritten` -> `MaybeInPlainState`
// unless `assume_history_complete` is true (for tests with identical data).
let result = HistoryInfo::from_lookup(
found_block,
is_before_first_write,
lowest_available_block_number,
))
);
Ok(match result {
HistoryInfo::NotYetWritten if !self.assume_history_complete => {
HistoryInfo::MaybeInPlainState
}
other => other,
})
}
/// Returns an error if the raw iterator is in an invalid state due to an I/O error.
@@ -2257,7 +2372,7 @@ mod tests {
/// Tests the edge case where block < `lowest_available_block_number`.
/// This case cannot be tested via `HistoricalStateProviderRef` (which errors before lookup),
/// so we keep this RocksDB-specific test to verify the low-level behavior.
/// so we keep this `RocksDB`-specific test to verify the low-level behavior.
#[test]
fn test_account_history_info_pruned_before_first_entry() {
let temp_dir = TempDir::new().unwrap();

View File

@@ -32,6 +32,13 @@ pub struct RocksDBTableStats {
pub pending_compaction_bytes: u64,
}
/// Pending history writes (stub - empty struct).
#[derive(Debug, Default)]
pub(crate) struct PendingHistoryWrites;
/// Pending history writes type alias (stub).
pub(crate) type PendingHistory = Arc<Mutex<PendingHistoryWrites>>;
/// Context for `RocksDB` block writes (stub).
#[derive(Debug, Clone)]
#[allow(dead_code)]
@@ -44,6 +51,8 @@ pub(crate) struct RocksDBWriteCtx {
pub storage_settings: StorageSettings,
/// Pending batches (stub - unused).
pub pending_batches: PendingRocksDBBatches,
/// Pending history writes (stub - unused).
pub pending_history: PendingHistory,
}
/// A stub `RocksDB` provider.