Compare commits

..

2 Commits

Author SHA1 Message Date
yongkangc
08412060c5 fix: address review nits - use max() and add rewrite path tests 2026-01-22 12:26:50 +00:00
yongkangc
64dc89c96e fix(rocksdb): check actual block numbers inside sentinel shards
Previously, invariant checks only looked at the highest_block_number in
shard keys, treating sentinel shards (u64::MAX) as 'no history'. This was
incorrect because sentinel shards contain actual block numbers that must
be checked for consistency.

Changes:
- check_storages_history_consistency: now reads sentinel shard contents
  and uses max(highest_block, max_sentinel_block) for consistency checks
- check_accounts_history_consistency: same sentinel shard handling
- prune_storages_history_above: now filters sentinel shard contents
  rather than deleting entire shards, preserving valid history
- prune_accounts_history_above: same filtering for sentinel shards
- Updated tests to set checkpoints matching sentinel shard data
2026-01-22 12:08:53 +00:00
6 changed files with 146 additions and 214 deletions

View File

@@ -1351,8 +1351,7 @@ mod rocksdb_tests {
// Run queries against both backends using EitherReader
let mdbx_ro = factory.database_provider_ro().unwrap();
// Use `with_assume_history_complete()` since both backends have identical data
let rocks_tx = rocks_provider.tx().with_assume_history_complete();
let rocks_tx = rocks_provider.tx();
for (i, query) in queries.iter().enumerate() {
// MDBX query via EitherReader
@@ -1444,8 +1443,7 @@ mod rocksdb_tests {
// Run queries against both backends using EitherReader
let mdbx_ro = factory.database_provider_ro().unwrap();
// Use `with_assume_history_complete()` since both backends have identical data
let rocks_tx = rocks_provider.tx().with_assume_history_complete();
let rocks_tx = rocks_provider.tx();
for (i, query) in queries.iter().enumerate() {
// MDBX query via EitherReader

View File

@@ -2,7 +2,7 @@ use crate::{
changesets_utils::StorageRevertsIter,
providers::{
database::{chain::ChainStorage, metrics},
rocksdb::{PendingHistory, PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
rocksdb::{PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
static_file::{StaticFileWriteCtx, StaticFileWriter},
NodeTypesForProvider, StaticFileProvider,
},
@@ -186,10 +186,6 @@ pub struct DatabaseProvider<TX, N: NodeTypes> {
/// Pending `RocksDB` batches to be committed at provider commit time.
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
pending_rocksdb_batches: PendingRocksDBBatches,
/// Pending history writes accumulated across `save_blocks()` calls.
/// Materialized into `RocksDB` shards at commit time after MDBX commits.
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
pending_rocksdb_history: PendingHistory,
/// Minimum distance from tip required for pruning
minimum_pruning_distance: u64,
/// Database provider metrics
@@ -208,7 +204,6 @@ impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
.field("rocksdb_provider", &self.rocksdb_provider)
.field("changeset_cache", &self.changeset_cache)
.field("pending_rocksdb_batches", &"<pending batches>")
.field("pending_rocksdb_history", &"<pending history>")
.field("minimum_pruning_distance", &self.minimum_pruning_distance)
.finish()
}
@@ -342,7 +337,6 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
rocksdb_provider,
changeset_cache,
pending_rocksdb_batches: Default::default(),
pending_rocksdb_history: Default::default(),
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
metrics: metrics::DatabaseProviderMetrics::default(),
}
@@ -416,7 +410,6 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
prune_tx_lookup: self.prune_modes.transaction_lookup,
storage_settings: self.cached_storage_settings(),
pending_batches: self.pending_rocksdb_batches.clone(),
pending_history: self.pending_rocksdb_history.clone(),
}
}
@@ -883,7 +876,6 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
rocksdb_provider,
changeset_cache,
pending_rocksdb_batches: Default::default(),
pending_rocksdb_history: Default::default(),
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
metrics: metrics::DatabaseProviderMetrics::default(),
}
@@ -3486,19 +3478,6 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
}
/// Commit database transaction, static files, and pending `RocksDB` batches.
///
/// # Commit Order (Critical for Correctness)
///
/// `RocksDB` history indices must commit AFTER MDBX changesets to prevent a race
/// condition where history indices point to non-existent changesets:
///
/// 1. Static files finalize (headers, transactions, receipts)
/// 2. MDBX commits (changesets become visible)
/// 3. `RocksDB` batches commit (non-history data like tx hashes)
/// 4. `RocksDB` history commits (account/storage history indices)
///
/// This ordering ensures that when a reader follows a `RocksDB` history index
/// to a changeset, the changeset exists in MDBX.
fn commit(self) -> ProviderResult<()> {
// For unwinding it makes more sense to commit the database first, since if
// it is interrupted before the static files commit, we can just
@@ -3509,15 +3488,10 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
#[cfg(all(unix, feature = "rocksdb"))]
{
// Commit non-history batches
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
for batch in batches {
self.rocksdb_provider.commit_batch(batch)?;
}
// Commit pending history (after MDBX, so changesets exist)
let history = std::mem::take(&mut *self.pending_rocksdb_history.lock());
self.rocksdb_provider.commit_pending_history(history)?;
}
self.static_file_provider.commit()?;
@@ -3529,30 +3503,20 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
self.static_file_provider.finalize()?;
timings.sf = start.elapsed();
// CRITICAL: MDBX must commit BEFORE RocksDB history indices.
// This prevents a race where history says "look at changeset N" but
// the changeset doesn't exist yet.
let start = Instant::now();
self.tx.commit()?;
timings.mdbx = start.elapsed();
#[cfg(all(unix, feature = "rocksdb"))]
{
let start = Instant::now();
// Commit non-history batches (tx hashes, etc.)
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
for batch in batches {
self.rocksdb_provider.commit_batch(batch)?;
}
// Commit pending history (after MDBX, so changesets exist)
let history = std::mem::take(&mut *self.pending_rocksdb_history.lock());
self.rocksdb_provider.commit_pending_history(history)?;
timings.rocksdb = start.elapsed();
}
let start = Instant::now();
self.tx.commit()?;
timings.mdbx = start.elapsed();
self.metrics.record_commit(&timings);
}

View File

@@ -262,7 +262,7 @@ impl RocksDBProvider {
let (key, value) = result?;
let highest = key.sharded_key.highest_block_number;
if highest == u64::MAX {
if let Some(max_in_shard) = value.iter().max() &&
if let Some(max_in_shard) = value.max() &&
max_in_shard > max_sentinel_block
{
max_sentinel_block = max_in_shard;
@@ -416,7 +416,7 @@ impl RocksDBProvider {
let highest = key.highest_block_number;
if highest == u64::MAX {
// Sentinel shard: check the actual max block number in the shard
if let Some(max_in_shard) = value.iter().max() &&
if let Some(max_in_shard) = value.max() &&
max_in_shard > max_sentinel_block
{
max_sentinel_block = max_in_shard;
@@ -1064,7 +1064,7 @@ mod tests {
// This simulates a scenario where history tracking started but no shards were completed
let key_sentinel_1 = StorageShardedKey::new(Address::ZERO, B256::ZERO, u64::MAX);
let key_sentinel_2 = StorageShardedKey::new(Address::random(), B256::random(), u64::MAX);
// Sentinel shards contain blocks [10, 20, 30], so max block in shard = 30
// Use a checkpoint that matches the sentinel shard contents (max block = 30)
let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
rocksdb.put::<tables::StoragesHistory>(key_sentinel_1, &block_list).unwrap();
rocksdb.put::<tables::StoragesHistory>(key_sentinel_2, &block_list).unwrap();
@@ -1078,8 +1078,8 @@ mod tests {
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
// Checkpoint = 30 matches sentinel shard max block, simulating "consistent" state
// where indexing is complete through the checkpoint block
// Set a checkpoint that matches the sentinel shard data (max block = 30)
// This is a normal scenario where sentinel shards exist and are in sync with checkpoint
{
let provider = factory.database_provider_rw().unwrap();
provider
@@ -1108,7 +1108,7 @@ mod tests {
// Insert ONLY sentinel entries (highest_block_number = u64::MAX)
let key_sentinel_1 = ShardedKey::new(Address::ZERO, u64::MAX);
let key_sentinel_2 = ShardedKey::new(Address::random(), u64::MAX);
// Sentinel shards contain blocks [10, 20, 30], so max block in shard = 30
// Use a checkpoint that matches the sentinel shard contents (max block = 30)
let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
rocksdb.put::<tables::AccountsHistory>(key_sentinel_1, &block_list).unwrap();
rocksdb.put::<tables::AccountsHistory>(key_sentinel_2, &block_list).unwrap();
@@ -1122,8 +1122,8 @@ mod tests {
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
// Checkpoint = 30 matches sentinel shard max block, simulating "consistent" state
// where indexing is complete through the checkpoint block
// Set a checkpoint that matches the sentinel shard data (max block = 30)
// This is a normal scenario where sentinel shards exist and are in sync with checkpoint
{
let provider = factory.database_provider_rw().unwrap();
provider
@@ -1466,4 +1466,101 @@ mod tests {
"Should require unwind to block 50 to rebuild AccountsHistory"
);
}
#[test]
fn test_prune_storages_history_sentinel_shard_rewrite() {
use reth_db_api::models::storage_sharded_key::StorageShardedKey;
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::StoragesHistory>()
.build()
.unwrap();
let address = Address::random();
let storage_key = B256::random();
// Create a sentinel shard with blocks both above and below checkpoint 100
// Contains [10, 50, 150] - after pruning at checkpoint 100, should become [10, 50]
let sentinel_key = StorageShardedKey::new(address, storage_key, u64::MAX);
let block_list = BlockNumberList::new_pre_sorted([10, 50, 150]);
rocksdb.put::<tables::StoragesHistory>(sentinel_key.clone(), &block_list).unwrap();
// Create a test provider factory for MDBX
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
// Set checkpoint to block 100
{
let provider = factory.database_provider_rw().unwrap();
provider
.save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
.unwrap();
provider.commit().unwrap();
}
let provider = factory.database_provider_ro().unwrap();
// Run consistency check - should prune the sentinel shard
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(result, None, "Should heal by pruning, no unwind needed");
// Verify the sentinel shard was rewritten (not deleted) with only [10, 50]
let rewritten = rocksdb.get::<tables::StoragesHistory>(sentinel_key).unwrap();
assert!(rewritten.is_some(), "Sentinel shard should be rewritten, not deleted");
let rewritten_list = rewritten.unwrap();
let blocks: Vec<u64> = rewritten_list.iter().collect();
assert_eq!(blocks, vec![10, 50], "Sentinel shard should contain only blocks <= 100");
}
#[test]
fn test_prune_accounts_history_sentinel_shard_rewrite() {
use reth_db_api::models::ShardedKey;
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::AccountsHistory>()
.build()
.unwrap();
let address = Address::random();
// Create a sentinel shard with blocks both above and below checkpoint 100
// Contains [10, 50, 150] - after pruning at checkpoint 100, should become [10, 50]
let sentinel_key = ShardedKey::new(address, u64::MAX);
let block_list = BlockNumberList::new_pre_sorted([10, 50, 150]);
rocksdb.put::<tables::AccountsHistory>(sentinel_key.clone(), &block_list).unwrap();
// Create a test provider factory for MDBX
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
// Set checkpoint to block 100
{
let provider = factory.database_provider_rw().unwrap();
provider
.save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
.unwrap();
provider.commit().unwrap();
}
let provider = factory.database_provider_ro().unwrap();
// Run consistency check - should prune the sentinel shard
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(result, None, "Should heal by pruning, no unwind needed");
// Verify the sentinel shard was rewritten (not deleted) with only [10, 50]
let rewritten = rocksdb.get::<tables::AccountsHistory>(sentinel_key).unwrap();
assert!(rewritten.is_some(), "Sentinel shard should be rewritten, not deleted");
let rewritten_list = rewritten.unwrap();
let blocks: Vec<u64> = rewritten_list.iter().collect();
assert_eq!(blocks, vec![10, 50], "Sentinel shard should contain only blocks <= 100");
}
}

View File

@@ -4,10 +4,7 @@ mod invariants;
mod metrics;
mod provider;
#[allow(unused_imports)]
pub(crate) use provider::{
PendingHistory, PendingHistoryWrites, PendingRocksDBBatches, RocksDBWriteCtx,
};
pub(crate) use provider::{PendingRocksDBBatches, RocksDBWriteCtx};
pub use provider::{
RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksDBRawIter, RocksDBTableStats, RocksTx,
};

View File

@@ -15,7 +15,7 @@ use reth_db_api::{
table::{Compress, Decode, Decompress, Encode, Table},
tables, BlockNumberList, DatabaseError,
};
use reth_primitives_traits::{AlloyBlockHeader as _, BlockBody as _};
use reth_primitives_traits::BlockBody as _;
use reth_prune_types::PruneMode;
use reth_storage_errors::{
db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel},
@@ -57,24 +57,6 @@ pub struct RocksDBTableStats {
pub pending_compaction_bytes: u64,
}
/// Pending history writes that are accumulated across multiple `save_blocks()` calls
/// and materialized into `RocksDB` shards at commit time.
///
/// This exists because [`RocksDBBatch::append_account_history_shard`] and
/// [`RocksDBBatch::append_storage_history_shard`] read from committed state, not pending writes.
/// By accumulating all history deltas in memory and materializing once at commit time, we ensure
/// each key is written exactly once with all its indices.
#[derive(Debug, Default)]
pub(crate) struct PendingHistoryWrites {
/// Account history: address -> list of block numbers where the account was modified.
pub accounts: BTreeMap<Address, Vec<u64>>,
/// Storage history: (address, slot) -> list of block numbers where the storage was modified.
pub storages: BTreeMap<(Address, B256), Vec<u64>>,
}
/// Pending history writes type alias.
pub(crate) type PendingHistory = Arc<Mutex<PendingHistoryWrites>>;
/// Context for `RocksDB` block writes.
#[derive(Clone)]
pub(crate) struct RocksDBWriteCtx {
@@ -86,9 +68,6 @@ pub(crate) struct RocksDBWriteCtx {
pub storage_settings: StorageSettings,
/// Pending batches to push to after writing.
pub pending_batches: PendingRocksDBBatches,
/// Pending history writes accumulated across `save_blocks()` calls.
/// Materialized into `RocksDB` shards at commit time.
pub pending_history: PendingHistory,
}
impl fmt::Debug for RocksDBWriteCtx {
@@ -98,7 +77,6 @@ impl fmt::Debug for RocksDBWriteCtx {
.field("prune_tx_lookup", &self.prune_tx_lookup)
.field("storage_settings", &self.storage_settings)
.field("pending_batches", &"<pending batches>")
.field("pending_history", &"<pending history>")
.finish()
}
}
@@ -616,7 +594,7 @@ impl RocksDBProvider {
let write_options = WriteOptions::default();
let txn_options = OptimisticTransactionOptions::default();
let inner = self.0.db_rw().transaction_opt(&write_options, &txn_options);
RocksTx { inner, provider: self, assume_history_complete: false }
RocksTx { inner, provider: self }
}
/// Creates a new batch for atomic writes.
@@ -1079,109 +1057,56 @@ impl RocksDBProvider {
Ok(())
}
/// Accumulates account history indices for the given blocks.
///
/// Indices are accumulated in `ctx.pending_history` and materialized at commit time
/// by [`RocksDBProvider::commit_pending_history`]. This ensures each key is written
/// exactly once, working around `RocksDB` batch reads not seeing uncommitted writes.
///
/// Only accounts with info changes (nonce, balance, `code_hash`) or lifecycle changes
/// (created/destroyed) are included, matching MDBX `AccountChangeSets` behavior.
/// Writes account history indices for the given blocks.
#[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
fn write_account_history<N: reth_node_types::NodePrimitives>(
&self,
blocks: &[ExecutedBlock<N>],
ctx: &RocksDBWriteCtx,
) -> ProviderResult<()> {
let mut local: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
for block in blocks {
let block_number = block.recovered_block().number();
let mut batch = self.batch();
let mut account_history: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
for (block_idx, block) in blocks.iter().enumerate() {
let block_number = ctx.first_block_number + block_idx as u64;
let bundle = &block.execution_outcome().state;
for (&address, account) in bundle.state() {
let info_changed = account.is_info_changed();
let was_destroyed = account.was_destroyed();
if info_changed || was_destroyed {
local.entry(address).or_default().push(block_number);
}
for &address in bundle.state().keys() {
account_history.entry(address).or_default().push(block_number);
}
}
let mut pending = ctx.pending_history.lock();
for (address, mut indices) in local {
pending.accounts.entry(address).or_default().append(&mut indices);
// Write account history using proper shard append logic
for (address, indices) in account_history {
batch.append_account_history_shard(address, indices)?;
}
ctx.pending_batches.lock().push(batch.into_inner());
Ok(())
}
/// Accumulates storage history indices for the given blocks.
///
/// Indices are accumulated in `ctx.pending_history` and materialized at commit time
/// by [`RocksDBProvider::commit_pending_history`]. This ensures each key is written
/// exactly once, working around `RocksDB` batch reads not seeing uncommitted writes.
///
/// Only storage slots with actual value changes are included, matching MDBX behavior.
/// Writes storage history indices for the given blocks.
#[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
fn write_storage_history<N: reth_node_types::NodePrimitives>(
&self,
blocks: &[ExecutedBlock<N>],
ctx: &RocksDBWriteCtx,
) -> ProviderResult<()> {
let mut local: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
for block in blocks {
let block_number = block.recovered_block().number();
let mut batch = self.batch();
let mut storage_history: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
for (block_idx, block) in blocks.iter().enumerate() {
let block_number = ctx.first_block_number + block_idx as u64;
let bundle = &block.execution_outcome().state;
for (&address, account) in bundle.state() {
for (&slot, storage_slot) in &account.storage {
if storage_slot.is_changed() {
let key = B256::new(slot.to_be_bytes());
local.entry((address, key)).or_default().push(block_number);
}
for &slot in account.storage.keys() {
let key = B256::new(slot.to_be_bytes());
storage_history.entry((address, key)).or_default().push(block_number);
}
}
}
let mut pending = ctx.pending_history.lock();
for ((address, slot), mut indices) in local {
pending.storages.entry((address, slot)).or_default().append(&mut indices);
}
Ok(())
}
/// Materializes pending history writes into `RocksDB` shards and commits them.
///
/// This is called at provider commit time, AFTER MDBX has committed. Each key is
/// written exactly once, ensuring [`RocksDBBatch::append_account_history_shard`] and
/// [`RocksDBBatch::append_storage_history_shard`] see committed state.
pub(crate) fn commit_pending_history(
&self,
history: PendingHistoryWrites,
) -> ProviderResult<()> {
if history.accounts.is_empty() && history.storages.is_empty() {
return Ok(());
}
let mut batch = self.batch();
// Write account history shards
for (address, mut indices) in history.accounts {
// Sort and dedup in case of duplicate block numbers from multiple save_blocks calls
indices.sort_unstable();
indices.dedup();
batch.append_account_history_shard(address, indices)?;
}
// Write storage history shards
for ((address, slot), mut indices) in history.storages {
indices.sort_unstable();
indices.dedup();
// Write storage history using proper shard append logic
for ((address, slot), indices) in storage_history {
batch.append_storage_history_shard(address, slot, indices)?;
}
if !batch.is_empty() {
batch.commit()?;
}
ctx.pending_batches.lock().push(batch.into_inner());
Ok(())
}
}
@@ -1596,14 +1521,10 @@ impl<'a> RocksDBBatch<'a> {
/// - Iteration over uncommitted data
///
/// Note: `Transaction` is `Send` but NOT `Sync`. This wrapper does not implement
/// `DbTx`/`DbTxMut` traits directly; use `RocksDB`-specific methods instead.
/// `DbTx`/`DbTxMut` traits directly; use RocksDB-specific methods instead.
pub struct RocksTx<'db> {
inner: Transaction<'db, OptimisticTransactionDB>,
provider: &'db RocksDBProvider,
/// When true, assume `RocksDB` has complete history (like MDBX) and return `NotYetWritten`
/// when querying before the first history entry. When false (default), return
/// `MaybeInPlainState` for hybrid storage safety.
assume_history_complete: bool,
}
impl fmt::Debug for RocksTx<'_> {
@@ -1613,20 +1534,6 @@ impl fmt::Debug for RocksTx<'_> {
}
impl<'db> RocksTx<'db> {
/// Sets the `assume_history_complete` flag to true.
///
/// When enabled, history queries will return `NotYetWritten` (like MDBX) instead of
/// `MaybeInPlainState` when querying before the first history entry.
///
/// WARNING: Only use in tests where `RocksDB` has complete history (identical to MDBX).
/// In production, `RocksDB` may have partial history, so this flag would cause incorrect
/// behavior (treating accounts as non-existent when they exist in MDBX).
#[cfg(test)]
pub const fn with_assume_history_complete(mut self) -> Self {
self.assume_history_complete = true;
self
}
/// Gets a value from the specified table. Sees uncommitted writes in this transaction.
pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
let encoded_key = key.encode();
@@ -1793,19 +1700,13 @@ impl<'db> RocksTx<'db> {
where
T: Table<Value = BlockNumberList>,
{
// When RocksDB can't find history for an address, we must fall back to plain state.
//
// This is critical for hybrid storage: RocksDB only contains history indices for blocks
// written AFTER it was enabled. Accounts modified before that exist only in MDBX.
// Returning `NotYetWritten` would incorrectly treat them as non-existent (nonce 0).
//
// When `assume_history_complete` is true (for tests with identical data), return
// `NotYetWritten` to match MDBX semantics.
// History may be pruned if a lowest available block is set.
let is_maybe_pruned = lowest_available_block_number.is_some();
let fallback = || {
Ok(if self.assume_history_complete {
HistoryInfo::NotYetWritten
} else {
Ok(if is_maybe_pruned {
HistoryInfo::MaybeInPlainState
} else {
HistoryInfo::NotYetWritten
})
};
@@ -1856,27 +1757,11 @@ impl<'db> RocksTx<'db> {
false
};
// For RocksDB, we must handle `is_before_first_write` specially.
//
// When RocksDB says the target block is before the first recorded write:
// - MDBX would return `NotYetWritten` (account doesn't exist) because MDBX has full history
// - RocksDB should return `MaybeInPlainState` because the account may exist in MDBX from
// before RocksDB was enabled
//
// We use `from_lookup` but then override `NotYetWritten` -> `MaybeInPlainState`
// unless `assume_history_complete` is true (for tests with identical data).
let result = HistoryInfo::from_lookup(
Ok(HistoryInfo::from_lookup(
found_block,
is_before_first_write,
lowest_available_block_number,
);
Ok(match result {
HistoryInfo::NotYetWritten if !self.assume_history_complete => {
HistoryInfo::MaybeInPlainState
}
other => other,
})
))
}
/// Returns an error if the raw iterator is in an invalid state due to an I/O error.
@@ -2372,7 +2257,7 @@ mod tests {
/// Tests the edge case where block < `lowest_available_block_number`.
/// This case cannot be tested via `HistoricalStateProviderRef` (which errors before lookup),
/// so we keep this `RocksDB`-specific test to verify the low-level behavior.
/// so we keep this RocksDB-specific test to verify the low-level behavior.
#[test]
fn test_account_history_info_pruned_before_first_entry() {
let temp_dir = TempDir::new().unwrap();

View File

@@ -32,13 +32,6 @@ pub struct RocksDBTableStats {
pub pending_compaction_bytes: u64,
}
/// Pending history writes (stub - empty struct).
#[derive(Debug, Default)]
pub(crate) struct PendingHistoryWrites;
/// Pending history writes type alias (stub).
pub(crate) type PendingHistory = Arc<Mutex<PendingHistoryWrites>>;
/// Context for `RocksDB` block writes (stub).
#[derive(Debug, Clone)]
#[allow(dead_code)]
@@ -51,8 +44,6 @@ pub(crate) struct RocksDBWriteCtx {
pub storage_settings: StorageSettings,
/// Pending batches (stub - unused).
pub pending_batches: PendingRocksDBBatches,
/// Pending history writes (stub - unused).
pub pending_history: PendingHistory,
}
/// A stub `RocksDB` provider.