Compare commits

..

3 Commits

Author SHA1 Message Date
yongkangc
e10c071279 fix: clippy and fmt issues
- Add backticks to RocksDB and MDBX in doc comments
- Make with_assume_history_complete const fn
2026-01-22 14:56:13 +00:00
yongkangc
03a89ddaf1 fix: reintroduce pruning awareness in history_info softening
When deciding whether to soften NotYetWritten -> MaybeInPlainState,
we must consider both:
1. assume_history_complete flag (for hybrid storage)
2. lowest_available_block_number (for pruned history)

We only return NotYetWritten when history is complete AND not pruned.
This prevents incorrectly treating pruned entries as 'never written'.
2026-01-22 13:38:01 +00:00
yongkangc
d9c9960fd7 fix(rocksdb): return MaybeInPlainState for missing history entries
RocksDB only has history for blocks AFTER it was enabled. For accounts
that existed before RocksDB was enabled, returning NotYetWritten
incorrectly treats them as non-existent (nonce 0). We now return
MaybeInPlainState to trigger a plain state lookup.

Added assume_history_complete flag to RocksTx that:
- When false (default): returns MaybeInPlainState for hybrid storage
- When true: returns NotYetWritten to match MDBX semantics for tests

This preserves the correct hybrid storage behavior in production while
allowing tests with identical data to verify semantic equivalence.
2026-01-22 12:58:35 +00:00
5 changed files with 101 additions and 297 deletions

View File

@@ -2,7 +2,7 @@ use crate::{
changesets_utils::StorageRevertsIter,
providers::{
database::{chain::ChainStorage, metrics},
rocksdb::{PendingHistory, PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
rocksdb::{PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
static_file::{StaticFileWriteCtx, StaticFileWriter},
NodeTypesForProvider, StaticFileProvider,
},
@@ -186,10 +186,6 @@ pub struct DatabaseProvider<TX, N: NodeTypes> {
/// Pending `RocksDB` batches to be committed at provider commit time.
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
pending_rocksdb_batches: PendingRocksDBBatches,
/// Pending history writes accumulated across `save_blocks()` calls.
/// Materialized into `RocksDB` shards at commit time after MDBX commits.
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
pending_rocksdb_history: PendingHistory,
/// Minimum distance from tip required for pruning
minimum_pruning_distance: u64,
/// Database provider metrics
@@ -208,7 +204,6 @@ impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
.field("rocksdb_provider", &self.rocksdb_provider)
.field("changeset_cache", &self.changeset_cache)
.field("pending_rocksdb_batches", &"<pending batches>")
.field("pending_rocksdb_history", &"<pending history>")
.field("minimum_pruning_distance", &self.minimum_pruning_distance)
.finish()
}
@@ -342,7 +337,6 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
rocksdb_provider,
changeset_cache,
pending_rocksdb_batches: Default::default(),
pending_rocksdb_history: Default::default(),
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
metrics: metrics::DatabaseProviderMetrics::default(),
}
@@ -416,7 +410,6 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
prune_tx_lookup: self.prune_modes.transaction_lookup,
storage_settings: self.cached_storage_settings(),
pending_batches: self.pending_rocksdb_batches.clone(),
pending_history: self.pending_rocksdb_history.clone(),
}
}
@@ -883,7 +876,6 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
rocksdb_provider,
changeset_cache,
pending_rocksdb_batches: Default::default(),
pending_rocksdb_history: Default::default(),
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
metrics: metrics::DatabaseProviderMetrics::default(),
}
@@ -3486,19 +3478,6 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
}
/// Commit database transaction, static files, and pending `RocksDB` batches.
///
/// # Commit Order (Critical for Correctness)
///
/// `RocksDB` history indices must commit AFTER MDBX changesets to prevent a race
/// condition where history indices point to non-existent changesets:
///
/// 1. Static files finalize (headers, transactions, receipts)
/// 2. MDBX commits (changesets become visible)
/// 3. `RocksDB` batches commit (non-history data like tx hashes)
/// 4. `RocksDB` history commits (account/storage history indices)
///
/// This ordering ensures that when a reader follows a `RocksDB` history index
/// to a changeset, the changeset exists in MDBX.
fn commit(self) -> ProviderResult<()> {
// For unwinding it makes more sense to commit the database first, since if
// it is interrupted before the static files commit, we can just
@@ -3509,15 +3488,10 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
#[cfg(all(unix, feature = "rocksdb"))]
{
// Commit non-history batches
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
for batch in batches {
self.rocksdb_provider.commit_batch(batch)?;
}
// Commit pending history (after MDBX, so changesets exist)
let history = std::mem::take(&mut *self.pending_rocksdb_history.lock());
self.rocksdb_provider.commit_pending_history(history)?;
}
self.static_file_provider.commit()?;
@@ -3529,30 +3503,20 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
self.static_file_provider.finalize()?;
timings.sf = start.elapsed();
// CRITICAL: MDBX must commit BEFORE RocksDB history indices.
// This prevents a race where history says "look at changeset N" but
// the changeset doesn't exist yet.
let start = Instant::now();
self.tx.commit()?;
timings.mdbx = start.elapsed();
#[cfg(all(unix, feature = "rocksdb"))]
{
let start = Instant::now();
// Commit non-history batches (tx hashes, etc.)
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
for batch in batches {
self.rocksdb_provider.commit_batch(batch)?;
}
// Commit pending history (after MDBX, so changesets exist)
let history = std::mem::take(&mut *self.pending_rocksdb_history.lock());
self.rocksdb_provider.commit_pending_history(history)?;
timings.rocksdb = start.elapsed();
}
let start = Instant::now();
self.tx.commit()?;
timings.mdbx = start.elapsed();
self.metrics.record_commit(&timings);
}

View File

@@ -253,21 +253,14 @@ impl RocksDBProvider {
return Ok(None);
}
// Find the max block number across all entries, including sentinel shards.
// Find the max highest_block_number (excluding u64::MAX sentinel) across all
// entries. Also track if we found any non-sentinel entries.
let mut max_highest_block = 0u64;
let mut max_sentinel_block = 0u64;
let mut found_non_sentinel = false;
for result in self.iter::<tables::StoragesHistory>()? {
let (key, value) = result?;
let (key, _) = result?;
let highest = key.sharded_key.highest_block_number;
if highest == u64::MAX {
if let Some(max_in_shard) = value.iter().max() &&
max_in_shard > max_sentinel_block
{
max_sentinel_block = max_in_shard;
}
} else {
if highest != u64::MAX {
found_non_sentinel = true;
if highest > max_highest_block {
max_highest_block = highest;
@@ -275,34 +268,31 @@ impl RocksDBProvider {
}
}
let effective_max = max_highest_block.max(max_sentinel_block);
// If we only have sentinel shards and they're all empty, we're consistent
if !found_non_sentinel && max_sentinel_block == 0 {
// If all entries are sentinel entries (u64::MAX), treat as first-run scenario.
// This means no completed shards exist (only sentinel shards with
// highest_block_number=u64::MAX), so no actual history has been indexed.
if !found_non_sentinel {
return Ok(None);
}
// If any entry has data > checkpoint, prune excess
if effective_max > checkpoint {
// If any entry has highest_block > checkpoint, prune excess
if max_highest_block > checkpoint {
tracing::info!(
target: "reth::providers::rocksdb",
rocks_highest = effective_max,
max_non_sentinel = max_highest_block,
max_sentinel = max_sentinel_block,
rocks_highest = max_highest_block,
checkpoint,
"StoragesHistory ahead of checkpoint, pruning excess data"
);
self.prune_storages_history_above(checkpoint)?;
return Ok(None);
} else if effective_max < checkpoint {
} else if max_highest_block < checkpoint {
// RocksDB is behind checkpoint, return highest block to signal unwind needed
tracing::warn!(
target: "reth::providers::rocksdb",
rocks_highest = effective_max,
rocks_highest = max_highest_block,
checkpoint,
"StoragesHistory behind checkpoint, unwind needed"
);
return Ok(Some(effective_max));
return Ok(Some(max_highest_block));
}
Ok(None)
@@ -319,42 +309,25 @@ impl RocksDBProvider {
/// For `StoragesHistory`, the key contains `highest_block_number`, so we can iterate
/// and delete entries where `key.sharded_key.highest_block_number > max_block`.
///
/// Sentinel shards (with `highest_block_number = u64::MAX`) require special handling:
/// we must read their contents and filter out block numbers > `max_block`.
///
/// TODO(<https://github.com/paradigmxyz/reth/issues/20417>): this iterates the whole table,
/// which is inefficient. Use changeset-based pruning instead.
fn prune_storages_history_above(&self, max_block: BlockNumber) -> ProviderResult<()> {
use reth_db_api::{models::storage_sharded_key::StorageShardedKey, BlockNumberList};
use reth_db_api::models::storage_sharded_key::StorageShardedKey;
let mut to_delete: Vec<StorageShardedKey> = Vec::new();
let mut to_rewrite: Vec<(StorageShardedKey, BlockNumberList)> = Vec::new();
for result in self.iter::<tables::StoragesHistory>()? {
let (key, value) = result?;
let (key, _) = result?;
let highest_block = key.sharded_key.highest_block_number;
if max_block == 0 {
to_delete.push(key);
} else if highest_block == u64::MAX {
let filtered: Vec<u64> = value.iter().filter(|&bn| bn <= max_block).collect();
if filtered.is_empty() {
to_delete.push(key);
} else if filtered.len() != value.len() as usize {
to_rewrite.push((key, BlockNumberList::new_pre_sorted(filtered)));
}
} else if highest_block > max_block {
if max_block == 0 || (highest_block != u64::MAX && highest_block > max_block) {
to_delete.push(key);
}
}
let deleted = to_delete.len();
let rewritten = to_rewrite.len();
if deleted > 0 || rewritten > 0 {
if deleted > 0 {
tracing::info!(
target: "reth::providers::rocksdb",
deleted_count = deleted,
rewritten_count = rewritten,
max_block,
"Pruning StoragesHistory entries"
);
@@ -363,9 +336,6 @@ impl RocksDBProvider {
for key in to_delete {
batch.delete::<tables::StoragesHistory>(key)?;
}
for (key, value) in to_rewrite {
batch.put::<tables::StoragesHistory>(key, &value)?;
}
batch.commit()?;
}
@@ -404,24 +374,14 @@ impl RocksDBProvider {
return Ok(None);
}
// Find the max block number across all entries, including sentinel shards.
// For sentinel shards (highest_block_number == u64::MAX), we need to look
// at the actual block numbers stored in the shard.
// Find the max highest_block_number (excluding u64::MAX sentinel) across all
// entries. Also track if we found any non-sentinel entries.
let mut max_highest_block = 0u64;
let mut max_sentinel_block = 0u64;
let mut found_non_sentinel = false;
for result in self.iter::<tables::AccountsHistory>()? {
let (key, value) = result?;
let (key, _) = result?;
let highest = key.highest_block_number;
if highest == u64::MAX {
// Sentinel shard: check the actual max block number in the shard
if let Some(max_in_shard) = value.iter().max() &&
max_in_shard > max_sentinel_block
{
max_sentinel_block = max_in_shard;
}
} else {
if highest != u64::MAX {
found_non_sentinel = true;
if highest > max_highest_block {
max_highest_block = highest;
@@ -429,22 +389,18 @@ impl RocksDBProvider {
}
}
// Use the higher of the two maxes
let effective_max = max_highest_block.max(max_sentinel_block);
// If we only have sentinel shards and they're all within checkpoint,
// we're consistent
if !found_non_sentinel && max_sentinel_block == 0 {
// If all entries are sentinel entries (u64::MAX), treat as first-run scenario.
// This means no completed shards exist (only sentinel shards with
// highest_block_number=u64::MAX), so no actual history has been indexed.
if !found_non_sentinel {
return Ok(None);
}
// If any entry has data > checkpoint, prune excess
if effective_max > checkpoint {
// If any entry has highest_block > checkpoint, prune excess
if max_highest_block > checkpoint {
tracing::info!(
target: "reth::providers::rocksdb",
rocks_highest = effective_max,
max_non_sentinel = max_highest_block,
max_sentinel = max_sentinel_block,
rocks_highest = max_highest_block,
checkpoint,
"AccountsHistory ahead of checkpoint, pruning excess data"
);
@@ -453,14 +409,14 @@ impl RocksDBProvider {
}
// If RocksDB is behind the checkpoint, request an unwind to rebuild.
if effective_max < checkpoint {
if max_highest_block < checkpoint {
tracing::warn!(
target: "reth::providers::rocksdb",
rocks_highest = effective_max,
rocks_highest = max_highest_block,
checkpoint,
"AccountsHistory behind checkpoint, unwind needed"
);
return Ok(Some(effective_max));
return Ok(Some(max_highest_block));
}
Ok(None)
@@ -478,47 +434,26 @@ impl RocksDBProvider {
/// `highest_block_number`, so we can iterate and delete entries where
/// `key.highest_block_number > max_block`.
///
/// Sentinel shards (with `highest_block_number = u64::MAX`) require special handling:
/// we must read their contents and filter out block numbers > `max_block`.
///
/// TODO(<https://github.com/paradigmxyz/reth/issues/20417>): this iterates the whole table,
/// which is inefficient. Use changeset-based pruning instead.
fn prune_accounts_history_above(&self, max_block: BlockNumber) -> ProviderResult<()> {
use alloy_primitives::Address;
use reth_db_api::{models::ShardedKey, BlockNumberList};
use reth_db_api::models::ShardedKey;
let mut to_delete: Vec<ShardedKey<Address>> = Vec::new();
let mut to_rewrite: Vec<(ShardedKey<Address>, BlockNumberList)> = Vec::new();
for result in self.iter::<tables::AccountsHistory>()? {
let (key, value) = result?;
let (key, _) = result?;
let highest_block = key.highest_block_number;
if max_block == 0 {
// Clear everything
to_delete.push(key);
} else if highest_block == u64::MAX {
// Sentinel shard: filter out block numbers > max_block
let filtered: Vec<u64> = value.iter().filter(|&bn| bn <= max_block).collect();
if filtered.is_empty() {
to_delete.push(key);
} else if filtered.len() != value.len() as usize {
// Some entries were filtered out, rewrite the shard
to_rewrite.push((key, BlockNumberList::new_pre_sorted(filtered)));
}
} else if highest_block > max_block {
// Non-sentinel shard above max_block: delete
if max_block == 0 || (highest_block != u64::MAX && highest_block > max_block) {
to_delete.push(key);
}
}
let deleted = to_delete.len();
let rewritten = to_rewrite.len();
if deleted > 0 || rewritten > 0 {
if deleted > 0 {
tracing::info!(
target: "reth::providers::rocksdb",
deleted_count = deleted,
rewritten_count = rewritten,
max_block,
"Pruning AccountsHistory entries"
);
@@ -527,9 +462,6 @@ impl RocksDBProvider {
for key in to_delete {
batch.delete::<tables::AccountsHistory>(key)?;
}
for (key, value) in to_rewrite {
batch.put::<tables::AccountsHistory>(key, &value)?;
}
batch.commit()?;
}
@@ -1064,7 +996,6 @@ mod tests {
// This simulates a scenario where history tracking started but no shards were completed
let key_sentinel_1 = StorageShardedKey::new(Address::ZERO, B256::ZERO, u64::MAX);
let key_sentinel_2 = StorageShardedKey::new(Address::random(), B256::random(), u64::MAX);
// Sentinel shards contain blocks [10, 20, 30], so max block in shard = 30
let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
rocksdb.put::<tables::StoragesHistory>(key_sentinel_1, &block_list).unwrap();
rocksdb.put::<tables::StoragesHistory>(key_sentinel_2, &block_list).unwrap();
@@ -1078,21 +1009,24 @@ mod tests {
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
// Checkpoint = 30 matches sentinel shard max block, simulating "consistent" state
// where indexing is complete through the checkpoint block
// Set a checkpoint indicating we should have processed up to block 100
{
let provider = factory.database_provider_rw().unwrap();
provider
.save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(30))
.save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
.unwrap();
provider.commit().unwrap();
}
let provider = factory.database_provider_ro().unwrap();
// RocksDB has sentinel entries matching the checkpoint - consistent state
// RocksDB has only sentinel entries (no completed shards) but checkpoint is set.
// This is treated as a first-run/migration scenario - no unwind needed.
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(result, None, "Sentinel-only entries matching checkpoint should be consistent");
assert_eq!(
result, None,
"Sentinel-only entries with checkpoint should be treated as first run"
);
}
#[test]
@@ -1108,7 +1042,6 @@ mod tests {
// Insert ONLY sentinel entries (highest_block_number = u64::MAX)
let key_sentinel_1 = ShardedKey::new(Address::ZERO, u64::MAX);
let key_sentinel_2 = ShardedKey::new(Address::random(), u64::MAX);
// Sentinel shards contain blocks [10, 20, 30], so max block in shard = 30
let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
rocksdb.put::<tables::AccountsHistory>(key_sentinel_1, &block_list).unwrap();
rocksdb.put::<tables::AccountsHistory>(key_sentinel_2, &block_list).unwrap();
@@ -1122,21 +1055,24 @@ mod tests {
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
// Checkpoint = 30 matches sentinel shard max block, simulating "consistent" state
// where indexing is complete through the checkpoint block
// Set a checkpoint indicating we should have processed up to block 100
{
let provider = factory.database_provider_rw().unwrap();
provider
.save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(30))
.save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
.unwrap();
provider.commit().unwrap();
}
let provider = factory.database_provider_ro().unwrap();
// RocksDB has sentinel entries matching the checkpoint - consistent state
// RocksDB has only sentinel entries (no completed shards) but checkpoint is set.
// This is treated as a first-run/migration scenario - no unwind needed.
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(result, None, "Sentinel-only entries matching checkpoint should be consistent");
assert_eq!(
result, None,
"Sentinel-only entries with checkpoint should be treated as first run"
);
}
#[test]

View File

@@ -4,10 +4,7 @@ mod invariants;
mod metrics;
mod provider;
#[allow(unused_imports)]
pub(crate) use provider::{
PendingHistory, PendingHistoryWrites, PendingRocksDBBatches, RocksDBWriteCtx,
};
pub(crate) use provider::{PendingRocksDBBatches, RocksDBWriteCtx};
pub use provider::{
RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksDBRawIter, RocksDBTableStats, RocksTx,
};

View File

@@ -15,7 +15,7 @@ use reth_db_api::{
table::{Compress, Decode, Decompress, Encode, Table},
tables, BlockNumberList, DatabaseError,
};
use reth_primitives_traits::{AlloyBlockHeader as _, BlockBody as _};
use reth_primitives_traits::BlockBody as _;
use reth_prune_types::PruneMode;
use reth_storage_errors::{
db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel},
@@ -57,24 +57,6 @@ pub struct RocksDBTableStats {
pub pending_compaction_bytes: u64,
}
/// Pending history writes that are accumulated across multiple `save_blocks()` calls
/// and materialized into `RocksDB` shards at commit time.
///
/// This exists because [`RocksDBBatch::append_account_history_shard`] and
/// [`RocksDBBatch::append_storage_history_shard`] read from committed state, not pending writes.
/// By accumulating all history deltas in memory and materializing once at commit time, we ensure
/// each key is written exactly once with all its indices.
#[derive(Debug, Default)]
pub(crate) struct PendingHistoryWrites {
/// Account history: address -> list of block numbers where the account was modified.
pub accounts: BTreeMap<Address, Vec<u64>>,
/// Storage history: (address, slot) -> list of block numbers where the storage was modified.
pub storages: BTreeMap<(Address, B256), Vec<u64>>,
}
/// Pending history writes type alias.
pub(crate) type PendingHistory = Arc<Mutex<PendingHistoryWrites>>;
/// Context for `RocksDB` block writes.
#[derive(Clone)]
pub(crate) struct RocksDBWriteCtx {
@@ -86,9 +68,6 @@ pub(crate) struct RocksDBWriteCtx {
pub storage_settings: StorageSettings,
/// Pending batches to push to after writing.
pub pending_batches: PendingRocksDBBatches,
/// Pending history writes accumulated across `save_blocks()` calls.
/// Materialized into `RocksDB` shards at commit time.
pub pending_history: PendingHistory,
}
impl fmt::Debug for RocksDBWriteCtx {
@@ -98,7 +77,6 @@ impl fmt::Debug for RocksDBWriteCtx {
.field("prune_tx_lookup", &self.prune_tx_lookup)
.field("storage_settings", &self.storage_settings)
.field("pending_batches", &"<pending batches>")
.field("pending_history", &"<pending history>")
.finish()
}
}
@@ -1079,109 +1057,56 @@ impl RocksDBProvider {
Ok(())
}
/// Accumulates account history indices for the given blocks.
///
/// Indices are accumulated in `ctx.pending_history` and materialized at commit time
/// by [`RocksDBProvider::commit_pending_history`]. This ensures each key is written
/// exactly once, working around `RocksDB` batch reads not seeing uncommitted writes.
///
/// Only accounts with info changes (nonce, balance, `code_hash`) or lifecycle changes
/// (created/destroyed) are included, matching MDBX `AccountChangeSets` behavior.
/// Writes account history indices for the given blocks.
#[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
fn write_account_history<N: reth_node_types::NodePrimitives>(
&self,
blocks: &[ExecutedBlock<N>],
ctx: &RocksDBWriteCtx,
) -> ProviderResult<()> {
let mut local: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
for block in blocks {
let block_number = block.recovered_block().number();
let mut batch = self.batch();
let mut account_history: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
for (block_idx, block) in blocks.iter().enumerate() {
let block_number = ctx.first_block_number + block_idx as u64;
let bundle = &block.execution_outcome().state;
for (&address, account) in bundle.state() {
let info_changed = account.is_info_changed();
let was_destroyed = account.was_destroyed();
if info_changed || was_destroyed {
local.entry(address).or_default().push(block_number);
}
for &address in bundle.state().keys() {
account_history.entry(address).or_default().push(block_number);
}
}
let mut pending = ctx.pending_history.lock();
for (address, mut indices) in local {
pending.accounts.entry(address).or_default().append(&mut indices);
// Write account history using proper shard append logic
for (address, indices) in account_history {
batch.append_account_history_shard(address, indices)?;
}
ctx.pending_batches.lock().push(batch.into_inner());
Ok(())
}
/// Accumulates storage history indices for the given blocks.
///
/// Indices are accumulated in `ctx.pending_history` and materialized at commit time
/// by [`RocksDBProvider::commit_pending_history`]. This ensures each key is written
/// exactly once, working around `RocksDB` batch reads not seeing uncommitted writes.
///
/// Only storage slots with actual value changes are included, matching MDBX behavior.
/// Writes storage history indices for the given blocks.
#[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
fn write_storage_history<N: reth_node_types::NodePrimitives>(
&self,
blocks: &[ExecutedBlock<N>],
ctx: &RocksDBWriteCtx,
) -> ProviderResult<()> {
let mut local: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
for block in blocks {
let block_number = block.recovered_block().number();
let mut batch = self.batch();
let mut storage_history: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
for (block_idx, block) in blocks.iter().enumerate() {
let block_number = ctx.first_block_number + block_idx as u64;
let bundle = &block.execution_outcome().state;
for (&address, account) in bundle.state() {
for (&slot, storage_slot) in &account.storage {
if storage_slot.is_changed() {
let key = B256::new(slot.to_be_bytes());
local.entry((address, key)).or_default().push(block_number);
}
for &slot in account.storage.keys() {
let key = B256::new(slot.to_be_bytes());
storage_history.entry((address, key)).or_default().push(block_number);
}
}
}
let mut pending = ctx.pending_history.lock();
for ((address, slot), mut indices) in local {
pending.storages.entry((address, slot)).or_default().append(&mut indices);
}
Ok(())
}
/// Materializes pending history writes into `RocksDB` shards and commits them.
///
/// This is called at provider commit time, AFTER MDBX has committed. Each key is
/// written exactly once, ensuring [`RocksDBBatch::append_account_history_shard`] and
/// [`RocksDBBatch::append_storage_history_shard`] see committed state.
pub(crate) fn commit_pending_history(
&self,
history: PendingHistoryWrites,
) -> ProviderResult<()> {
if history.accounts.is_empty() && history.storages.is_empty() {
return Ok(());
}
let mut batch = self.batch();
// Write account history shards
for (address, mut indices) in history.accounts {
// Sort and dedup in case of duplicate block numbers from multiple save_blocks calls
indices.sort_unstable();
indices.dedup();
batch.append_account_history_shard(address, indices)?;
}
// Write storage history shards
for ((address, slot), mut indices) in history.storages {
indices.sort_unstable();
indices.dedup();
// Write storage history using proper shard append logic
for ((address, slot), indices) in storage_history {
batch.append_storage_history_shard(address, slot, indices)?;
}
if !batch.is_empty() {
batch.commit()?;
}
ctx.pending_batches.lock().push(batch.into_inner());
Ok(())
}
}
@@ -1596,11 +1521,11 @@ impl<'a> RocksDBBatch<'a> {
/// - Iteration over uncommitted data
///
/// Note: `Transaction` is `Send` but NOT `Sync`. This wrapper does not implement
/// `DbTx`/`DbTxMut` traits directly; use `RocksDB`-specific methods instead.
/// `DbTx`/`DbTxMut` traits directly; use RocksDB-specific methods instead.
pub struct RocksTx<'db> {
inner: Transaction<'db, OptimisticTransactionDB>,
provider: &'db RocksDBProvider,
/// When true, assume `RocksDB` has complete history (like MDBX) and return `NotYetWritten`
/// When true, assume `RocksDB` has complete history (like `MDBX`) and return `NotYetWritten`
/// when querying before the first history entry. When false (default), return
/// `MaybeInPlainState` for hybrid storage safety.
assume_history_complete: bool,
@@ -1615,13 +1540,9 @@ impl fmt::Debug for RocksTx<'_> {
impl<'db> RocksTx<'db> {
/// Sets the `assume_history_complete` flag to true.
///
/// When enabled, history queries will return `NotYetWritten` (like MDBX) instead of
/// `MaybeInPlainState` when querying before the first history entry.
///
/// WARNING: Only use in tests where `RocksDB` has complete history (identical to MDBX).
/// In production, `RocksDB` may have partial history, so this flag would cause incorrect
/// behavior (treating accounts as non-existent when they exist in MDBX).
#[cfg(test)]
/// When enabled, history queries will return `NotYetWritten` (like `MDBX`) instead of
/// `MaybeInPlainState` when querying before the first history entry. Use this in tests
/// where `RocksDB` and `MDBX` have identical data.
pub const fn with_assume_history_complete(mut self) -> Self {
self.assume_history_complete = true;
self
@@ -1793,19 +1714,22 @@ impl<'db> RocksTx<'db> {
where
T: Table<Value = BlockNumberList>,
{
// When RocksDB can't find history for an address, we must fall back to plain state.
// Determines whether to soften NotYetWritten -> MaybeInPlainState.
//
// This is critical for hybrid storage: RocksDB only contains history indices for blocks
// written AFTER it was enabled. Accounts modified before that exist only in MDBX.
// Returning `NotYetWritten` would incorrectly treat them as non-existent (nonce 0).
// We soften when:
// 1. `assume_history_complete` is false (hybrid storage - RocksDB may not have full
// history)
// 2. OR history may be pruned (`lowest_available_block_number.is_some()`)
//
// When `assume_history_complete` is true (for tests with identical data), return
// `NotYetWritten` to match MDBX semantics.
// We only return NotYetWritten when we're certain history is complete AND not pruned.
let should_soften_not_yet_written =
!self.assume_history_complete || lowest_available_block_number.is_some();
let fallback = || {
Ok(if self.assume_history_complete {
HistoryInfo::NotYetWritten
} else {
Ok(if should_soften_not_yet_written {
HistoryInfo::MaybeInPlainState
} else {
HistoryInfo::NotYetWritten
})
};
@@ -1856,15 +1780,7 @@ impl<'db> RocksTx<'db> {
false
};
// For RocksDB, we must handle `is_before_first_write` specially.
//
// When RocksDB says the target block is before the first recorded write:
// - MDBX would return `NotYetWritten` (account doesn't exist) because MDBX has full history
// - RocksDB should return `MaybeInPlainState` because the account may exist in MDBX from
// before RocksDB was enabled
//
// We use `from_lookup` but then override `NotYetWritten` -> `MaybeInPlainState`
// unless `assume_history_complete` is true (for tests with identical data).
// Apply the same softening logic to `from_lookup` result.
let result = HistoryInfo::from_lookup(
found_block,
is_before_first_write,
@@ -1872,7 +1788,7 @@ impl<'db> RocksTx<'db> {
);
Ok(match result {
HistoryInfo::NotYetWritten if !self.assume_history_complete => {
HistoryInfo::NotYetWritten if should_soften_not_yet_written => {
HistoryInfo::MaybeInPlainState
}
other => other,
@@ -2372,7 +2288,7 @@ mod tests {
/// Tests the edge case where block < `lowest_available_block_number`.
/// This case cannot be tested via `HistoricalStateProviderRef` (which errors before lookup),
/// so we keep this `RocksDB`-specific test to verify the low-level behavior.
/// so we keep this RocksDB-specific test to verify the low-level behavior.
#[test]
fn test_account_history_info_pruned_before_first_entry() {
let temp_dir = TempDir::new().unwrap();

View File

@@ -32,13 +32,6 @@ pub struct RocksDBTableStats {
pub pending_compaction_bytes: u64,
}
/// Pending history writes (stub - empty struct).
#[derive(Debug, Default)]
pub(crate) struct PendingHistoryWrites;
/// Pending history writes type alias (stub).
pub(crate) type PendingHistory = Arc<Mutex<PendingHistoryWrites>>;
/// Context for `RocksDB` block writes (stub).
#[derive(Debug, Clone)]
#[allow(dead_code)]
@@ -51,8 +44,6 @@ pub(crate) struct RocksDBWriteCtx {
pub storage_settings: StorageSettings,
/// Pending batches (stub - unused).
pub pending_batches: PendingRocksDBBatches,
/// Pending history writes (stub - unused).
pub pending_history: PendingHistory,
}
/// A stub `RocksDB` provider.