Compare commits

...

5 Commits
main ... wt-pr2

Author SHA1 Message Date
yongkangc
6be57ce9e1 fix: address review nits 2026-01-22 12:26:42 +00:00
Georgios Konstantopoulos
b997bee71b fix: add backticks to remaining RocksDB in doc comments 2026-01-22 12:08:21 +00:00
Georgios Konstantopoulos
e3c7413747 fix: clippy warnings and add missing import for rocksdb provider 2026-01-22 12:08:19 +00:00
Georgios Konstantopoulos
0bf2335c8c fix(rocksdb): add assume_history_complete flag to fix test semantics
When testing RocksDB with identical data to MDBX, both backends should
return the same results. The MaybeInPlainState fallback (for hybrid
storage safety) was causing test failures because MDBX returns
NotYetWritten when querying before first history entry.

Added assume_history_complete flag to RocksTx that:
- When false (default): returns MaybeInPlainState for hybrid storage
- When true: returns NotYetWritten to match MDBX semantics for tests

This preserves the correct hybrid storage behavior in production while
allowing tests with identical data to verify semantic equivalence.
2026-01-22 12:08:16 +00:00
yongkangc
1f7a5acae1 fix(rocksdb): correct history index divergence causing nonce errors after unwind
Fixes RocksDB history indices diverging from MDBX after unwind operations,
causing 'max nonce mismatch' errors during block replay.

Root causes:
1. write_account_history/write_storage_history included all accounts/slots
   instead of only those with actual changes (account-info or storage value)
2. history_info in RocksTx returned NotYetWritten for missing entries instead
   of MaybeInPlainState, incorrectly treating pre-RocksDB accounts as new
3. History writes accumulated during save_blocks calls without deferred commit
4. Sentinel shards (highest_block_number=u64::MAX) not properly handled in
   invariant checks

Changes:
- Filter account history to only info-changed or destroyed accounts
- Filter storage history to only actually-changed slots
- Always return MaybeInPlainState from history_info for missing entries
- Add PendingHistoryWrites to accumulate history across save_blocks calls
- Commit pending history after MDBX (not before) via commit_pending_history
- Check sentinel shard contents when validating/pruning history
2026-01-22 12:08:14 +00:00
6 changed files with 322 additions and 93 deletions

View File

@@ -1351,7 +1351,8 @@ mod rocksdb_tests {
// Run queries against both backends using EitherReader
let mdbx_ro = factory.database_provider_ro().unwrap();
let rocks_tx = rocks_provider.tx();
// Use `with_assume_history_complete()` since both backends have identical data
let rocks_tx = rocks_provider.tx().with_assume_history_complete();
for (i, query) in queries.iter().enumerate() {
// MDBX query via EitherReader
@@ -1443,7 +1444,8 @@ mod rocksdb_tests {
// Run queries against both backends using EitherReader
let mdbx_ro = factory.database_provider_ro().unwrap();
let rocks_tx = rocks_provider.tx();
// Use `with_assume_history_complete()` since both backends have identical data
let rocks_tx = rocks_provider.tx().with_assume_history_complete();
for (i, query) in queries.iter().enumerate() {
// MDBX query via EitherReader

View File

@@ -2,7 +2,7 @@ use crate::{
changesets_utils::StorageRevertsIter,
providers::{
database::{chain::ChainStorage, metrics},
rocksdb::{PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
rocksdb::{PendingHistory, PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
static_file::{StaticFileWriteCtx, StaticFileWriter},
NodeTypesForProvider, StaticFileProvider,
},
@@ -186,6 +186,10 @@ pub struct DatabaseProvider<TX, N: NodeTypes> {
/// Pending `RocksDB` batches to be committed at provider commit time.
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
pending_rocksdb_batches: PendingRocksDBBatches,
/// Pending history writes accumulated across `save_blocks()` calls.
/// Materialized into `RocksDB` shards at commit time after MDBX commits.
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
pending_rocksdb_history: PendingHistory,
/// Minimum distance from tip required for pruning
minimum_pruning_distance: u64,
/// Database provider metrics
@@ -204,6 +208,7 @@ impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
.field("rocksdb_provider", &self.rocksdb_provider)
.field("changeset_cache", &self.changeset_cache)
.field("pending_rocksdb_batches", &"<pending batches>")
.field("pending_rocksdb_history", &"<pending history>")
.field("minimum_pruning_distance", &self.minimum_pruning_distance)
.finish()
}
@@ -337,6 +342,7 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
rocksdb_provider,
changeset_cache,
pending_rocksdb_batches: Default::default(),
pending_rocksdb_history: Default::default(),
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
metrics: metrics::DatabaseProviderMetrics::default(),
}
@@ -410,6 +416,7 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
prune_tx_lookup: self.prune_modes.transaction_lookup,
storage_settings: self.cached_storage_settings(),
pending_batches: self.pending_rocksdb_batches.clone(),
pending_history: self.pending_rocksdb_history.clone(),
}
}
@@ -876,6 +883,7 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
rocksdb_provider,
changeset_cache,
pending_rocksdb_batches: Default::default(),
pending_rocksdb_history: Default::default(),
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
metrics: metrics::DatabaseProviderMetrics::default(),
}
@@ -3478,6 +3486,19 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
}
/// Commit database transaction, static files, and pending `RocksDB` batches.
///
/// # Commit Order (Critical for Correctness)
///
/// `RocksDB` history indices must commit AFTER MDBX changesets to prevent a race
/// condition where history indices point to non-existent changesets:
///
/// 1. Static files finalize (headers, transactions, receipts)
/// 2. MDBX commits (changesets become visible)
/// 3. `RocksDB` batches commit (non-history data like tx hashes)
/// 4. `RocksDB` history commits (account/storage history indices)
///
/// This ordering ensures that when a reader follows a `RocksDB` history index
/// to a changeset, the changeset exists in MDBX.
fn commit(self) -> ProviderResult<()> {
// For unwinding it makes more sense to commit the database first, since if
// it is interrupted before the static files commit, we can just
@@ -3488,10 +3509,15 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
#[cfg(all(unix, feature = "rocksdb"))]
{
// Commit non-history batches
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
for batch in batches {
self.rocksdb_provider.commit_batch(batch)?;
}
// Commit pending history (after MDBX, so changesets exist)
let history = std::mem::take(&mut *self.pending_rocksdb_history.lock());
self.rocksdb_provider.commit_pending_history(history)?;
}
self.static_file_provider.commit()?;
@@ -3503,20 +3529,30 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
self.static_file_provider.finalize()?;
timings.sf = start.elapsed();
// CRITICAL: MDBX must commit BEFORE RocksDB history indices.
// This prevents a race where history says "look at changeset N" but
// the changeset doesn't exist yet.
let start = Instant::now();
self.tx.commit()?;
timings.mdbx = start.elapsed();
#[cfg(all(unix, feature = "rocksdb"))]
{
let start = Instant::now();
// Commit non-history batches (tx hashes, etc.)
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
for batch in batches {
self.rocksdb_provider.commit_batch(batch)?;
}
// Commit pending history (after MDBX, so changesets exist)
let history = std::mem::take(&mut *self.pending_rocksdb_history.lock());
self.rocksdb_provider.commit_pending_history(history)?;
timings.rocksdb = start.elapsed();
}
let start = Instant::now();
self.tx.commit()?;
timings.mdbx = start.elapsed();
self.metrics.record_commit(&timings);
}

View File

@@ -253,14 +253,21 @@ impl RocksDBProvider {
return Ok(None);
}
// Find the max highest_block_number (excluding u64::MAX sentinel) across all
// entries. Also track if we found any non-sentinel entries.
// Find the max block number across all entries, including sentinel shards.
let mut max_highest_block = 0u64;
let mut max_sentinel_block = 0u64;
let mut found_non_sentinel = false;
for result in self.iter::<tables::StoragesHistory>()? {
let (key, _) = result?;
let (key, value) = result?;
let highest = key.sharded_key.highest_block_number;
if highest != u64::MAX {
if highest == u64::MAX {
if let Some(max_in_shard) = value.iter().max() &&
max_in_shard > max_sentinel_block
{
max_sentinel_block = max_in_shard;
}
} else {
found_non_sentinel = true;
if highest > max_highest_block {
max_highest_block = highest;
@@ -268,31 +275,34 @@ impl RocksDBProvider {
}
}
// If all entries are sentinel entries (u64::MAX), treat as first-run scenario.
// This means no completed shards exist (only sentinel shards with
// highest_block_number=u64::MAX), so no actual history has been indexed.
if !found_non_sentinel {
let effective_max = max_highest_block.max(max_sentinel_block);
// If we only have sentinel shards and they're all empty, we're consistent
if !found_non_sentinel && max_sentinel_block == 0 {
return Ok(None);
}
// If any entry has highest_block > checkpoint, prune excess
if max_highest_block > checkpoint {
// If any entry has data > checkpoint, prune excess
if effective_max > checkpoint {
tracing::info!(
target: "reth::providers::rocksdb",
rocks_highest = max_highest_block,
rocks_highest = effective_max,
max_non_sentinel = max_highest_block,
max_sentinel = max_sentinel_block,
checkpoint,
"StoragesHistory ahead of checkpoint, pruning excess data"
);
self.prune_storages_history_above(checkpoint)?;
} else if max_highest_block < checkpoint {
return Ok(None);
} else if effective_max < checkpoint {
// RocksDB is behind checkpoint, return highest block to signal unwind needed
tracing::warn!(
target: "reth::providers::rocksdb",
rocks_highest = max_highest_block,
rocks_highest = effective_max,
checkpoint,
"StoragesHistory behind checkpoint, unwind needed"
);
return Ok(Some(max_highest_block));
return Ok(Some(effective_max));
}
Ok(None)
@@ -309,25 +319,42 @@ impl RocksDBProvider {
/// For `StoragesHistory`, the key contains `highest_block_number`, so we can iterate
/// and delete entries where `key.sharded_key.highest_block_number > max_block`.
///
/// Sentinel shards (with `highest_block_number = u64::MAX`) require special handling:
/// we must read their contents and filter out block numbers > `max_block`.
///
/// TODO(<https://github.com/paradigmxyz/reth/issues/20417>): this iterates the whole table,
/// which is inefficient. Use changeset-based pruning instead.
fn prune_storages_history_above(&self, max_block: BlockNumber) -> ProviderResult<()> {
use reth_db_api::models::storage_sharded_key::StorageShardedKey;
use reth_db_api::{models::storage_sharded_key::StorageShardedKey, BlockNumberList};
let mut to_delete: Vec<StorageShardedKey> = Vec::new();
let mut to_rewrite: Vec<(StorageShardedKey, BlockNumberList)> = Vec::new();
for result in self.iter::<tables::StoragesHistory>()? {
let (key, _) = result?;
let (key, value) = result?;
let highest_block = key.sharded_key.highest_block_number;
if max_block == 0 || (highest_block != u64::MAX && highest_block > max_block) {
if max_block == 0 {
to_delete.push(key);
} else if highest_block == u64::MAX {
let filtered: Vec<u64> = value.iter().filter(|&bn| bn <= max_block).collect();
if filtered.is_empty() {
to_delete.push(key);
} else if filtered.len() != value.len() as usize {
to_rewrite.push((key, BlockNumberList::new_pre_sorted(filtered)));
}
} else if highest_block > max_block {
to_delete.push(key);
}
}
let deleted = to_delete.len();
if deleted > 0 {
let rewritten = to_rewrite.len();
if deleted > 0 || rewritten > 0 {
tracing::info!(
target: "reth::providers::rocksdb",
deleted_count = deleted,
rewritten_count = rewritten,
max_block,
"Pruning StoragesHistory entries"
);
@@ -336,6 +363,9 @@ impl RocksDBProvider {
for key in to_delete {
batch.delete::<tables::StoragesHistory>(key)?;
}
for (key, value) in to_rewrite {
batch.put::<tables::StoragesHistory>(key, &value)?;
}
batch.commit()?;
}
@@ -374,14 +404,24 @@ impl RocksDBProvider {
return Ok(None);
}
// Find the max highest_block_number (excluding u64::MAX sentinel) across all
// entries. Also track if we found any non-sentinel entries.
// Find the max block number across all entries, including sentinel shards.
// For sentinel shards (highest_block_number == u64::MAX), we need to look
// at the actual block numbers stored in the shard.
let mut max_highest_block = 0u64;
let mut max_sentinel_block = 0u64;
let mut found_non_sentinel = false;
for result in self.iter::<tables::AccountsHistory>()? {
let (key, _) = result?;
let (key, value) = result?;
let highest = key.highest_block_number;
if highest != u64::MAX {
if highest == u64::MAX {
// Sentinel shard: check the actual max block number in the shard
if let Some(max_in_shard) = value.iter().max() &&
max_in_shard > max_sentinel_block
{
max_sentinel_block = max_in_shard;
}
} else {
found_non_sentinel = true;
if highest > max_highest_block {
max_highest_block = highest;
@@ -389,18 +429,22 @@ impl RocksDBProvider {
}
}
// If all entries are sentinel entries (u64::MAX), treat as first-run scenario.
// This means no completed shards exist (only sentinel shards with
// highest_block_number=u64::MAX), so no actual history has been indexed.
if !found_non_sentinel {
// Use the higher of the two maxes
let effective_max = max_highest_block.max(max_sentinel_block);
// If we only have sentinel shards and they're all within checkpoint,
// we're consistent
if !found_non_sentinel && max_sentinel_block == 0 {
return Ok(None);
}
// If any entry has highest_block > checkpoint, prune excess
if max_highest_block > checkpoint {
// If any entry has data > checkpoint, prune excess
if effective_max > checkpoint {
tracing::info!(
target: "reth::providers::rocksdb",
rocks_highest = max_highest_block,
rocks_highest = effective_max,
max_non_sentinel = max_highest_block,
max_sentinel = max_sentinel_block,
checkpoint,
"AccountsHistory ahead of checkpoint, pruning excess data"
);
@@ -409,14 +453,14 @@ impl RocksDBProvider {
}
// If RocksDB is behind the checkpoint, request an unwind to rebuild.
if max_highest_block < checkpoint {
if effective_max < checkpoint {
tracing::warn!(
target: "reth::providers::rocksdb",
rocks_highest = max_highest_block,
rocks_highest = effective_max,
checkpoint,
"AccountsHistory behind checkpoint, unwind needed"
);
return Ok(Some(max_highest_block));
return Ok(Some(effective_max));
}
Ok(None)
@@ -434,26 +478,47 @@ impl RocksDBProvider {
/// `highest_block_number`, so we can iterate and delete entries where
/// `key.highest_block_number > max_block`.
///
/// Sentinel shards (with `highest_block_number = u64::MAX`) require special handling:
/// we must read their contents and filter out block numbers > `max_block`.
///
/// TODO(<https://github.com/paradigmxyz/reth/issues/20417>): this iterates the whole table,
/// which is inefficient. Use changeset-based pruning instead.
fn prune_accounts_history_above(&self, max_block: BlockNumber) -> ProviderResult<()> {
use alloy_primitives::Address;
use reth_db_api::models::ShardedKey;
use reth_db_api::{models::ShardedKey, BlockNumberList};
let mut to_delete: Vec<ShardedKey<Address>> = Vec::new();
let mut to_rewrite: Vec<(ShardedKey<Address>, BlockNumberList)> = Vec::new();
for result in self.iter::<tables::AccountsHistory>()? {
let (key, _) = result?;
let (key, value) = result?;
let highest_block = key.highest_block_number;
if max_block == 0 || (highest_block != u64::MAX && highest_block > max_block) {
if max_block == 0 {
// Clear everything
to_delete.push(key);
} else if highest_block == u64::MAX {
// Sentinel shard: filter out block numbers > max_block
let filtered: Vec<u64> = value.iter().filter(|&bn| bn <= max_block).collect();
if filtered.is_empty() {
to_delete.push(key);
} else if filtered.len() != value.len() as usize {
// Some entries were filtered out, rewrite the shard
to_rewrite.push((key, BlockNumberList::new_pre_sorted(filtered)));
}
} else if highest_block > max_block {
// Non-sentinel shard above max_block: delete
to_delete.push(key);
}
}
let deleted = to_delete.len();
if deleted > 0 {
let rewritten = to_rewrite.len();
if deleted > 0 || rewritten > 0 {
tracing::info!(
target: "reth::providers::rocksdb",
deleted_count = deleted,
rewritten_count = rewritten,
max_block,
"Pruning AccountsHistory entries"
);
@@ -462,6 +527,9 @@ impl RocksDBProvider {
for key in to_delete {
batch.delete::<tables::AccountsHistory>(key)?;
}
for (key, value) in to_rewrite {
batch.put::<tables::AccountsHistory>(key, &value)?;
}
batch.commit()?;
}
@@ -996,6 +1064,7 @@ mod tests {
// This simulates a scenario where history tracking started but no shards were completed
let key_sentinel_1 = StorageShardedKey::new(Address::ZERO, B256::ZERO, u64::MAX);
let key_sentinel_2 = StorageShardedKey::new(Address::random(), B256::random(), u64::MAX);
// Sentinel shards contain blocks [10, 20, 30], so max block in shard = 30
let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
rocksdb.put::<tables::StoragesHistory>(key_sentinel_1, &block_list).unwrap();
rocksdb.put::<tables::StoragesHistory>(key_sentinel_2, &block_list).unwrap();
@@ -1009,24 +1078,21 @@ mod tests {
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
// Set a checkpoint indicating we should have processed up to block 100
// Checkpoint = 30 matches sentinel shard max block, simulating "consistent" state
// where indexing is complete through the checkpoint block
{
let provider = factory.database_provider_rw().unwrap();
provider
.save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
.save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(30))
.unwrap();
provider.commit().unwrap();
}
let provider = factory.database_provider_ro().unwrap();
// RocksDB has only sentinel entries (no completed shards) but checkpoint is set.
// This is treated as a first-run/migration scenario - no unwind needed.
// RocksDB has sentinel entries matching the checkpoint - consistent state
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(
result, None,
"Sentinel-only entries with checkpoint should be treated as first run"
);
assert_eq!(result, None, "Sentinel-only entries matching checkpoint should be consistent");
}
#[test]
@@ -1042,6 +1108,7 @@ mod tests {
// Insert ONLY sentinel entries (highest_block_number = u64::MAX)
let key_sentinel_1 = ShardedKey::new(Address::ZERO, u64::MAX);
let key_sentinel_2 = ShardedKey::new(Address::random(), u64::MAX);
// Sentinel shards contain blocks [10, 20, 30], so max block in shard = 30
let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
rocksdb.put::<tables::AccountsHistory>(key_sentinel_1, &block_list).unwrap();
rocksdb.put::<tables::AccountsHistory>(key_sentinel_2, &block_list).unwrap();
@@ -1055,24 +1122,21 @@ mod tests {
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
// Set a checkpoint indicating we should have processed up to block 100
// Checkpoint = 30 matches sentinel shard max block, simulating "consistent" state
// where indexing is complete through the checkpoint block
{
let provider = factory.database_provider_rw().unwrap();
provider
.save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
.save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(30))
.unwrap();
provider.commit().unwrap();
}
let provider = factory.database_provider_ro().unwrap();
// RocksDB has only sentinel entries (no completed shards) but checkpoint is set.
// This is treated as a first-run/migration scenario - no unwind needed.
// RocksDB has sentinel entries matching the checkpoint - consistent state
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(
result, None,
"Sentinel-only entries with checkpoint should be treated as first run"
);
assert_eq!(result, None, "Sentinel-only entries matching checkpoint should be consistent");
}
#[test]

View File

@@ -4,7 +4,10 @@ mod invariants;
mod metrics;
mod provider;
pub(crate) use provider::{PendingRocksDBBatches, RocksDBWriteCtx};
#[allow(unused_imports)]
pub(crate) use provider::{
PendingHistory, PendingHistoryWrites, PendingRocksDBBatches, RocksDBWriteCtx,
};
pub use provider::{
RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksDBRawIter, RocksDBTableStats, RocksTx,
};

View File

@@ -15,7 +15,7 @@ use reth_db_api::{
table::{Compress, Decode, Decompress, Encode, Table},
tables, BlockNumberList, DatabaseError,
};
use reth_primitives_traits::BlockBody as _;
use reth_primitives_traits::{AlloyBlockHeader as _, BlockBody as _};
use reth_prune_types::PruneMode;
use reth_storage_errors::{
db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel},
@@ -57,6 +57,24 @@ pub struct RocksDBTableStats {
pub pending_compaction_bytes: u64,
}
/// Pending history writes that are accumulated across multiple `save_blocks()` calls
/// and materialized into `RocksDB` shards at commit time.
///
/// This exists because [`RocksDBBatch::append_account_history_shard`] and
/// [`RocksDBBatch::append_storage_history_shard`] read from committed state, not pending writes.
/// By accumulating all history deltas in memory and materializing once at commit time, we ensure
/// each key is written exactly once with all its indices.
#[derive(Debug, Default)]
pub(crate) struct PendingHistoryWrites {
/// Account history: address -> list of block numbers where the account was modified.
pub accounts: BTreeMap<Address, Vec<u64>>,
/// Storage history: (address, slot) -> list of block numbers where the storage was modified.
pub storages: BTreeMap<(Address, B256), Vec<u64>>,
}
/// Pending history writes type alias.
pub(crate) type PendingHistory = Arc<Mutex<PendingHistoryWrites>>;
/// Context for `RocksDB` block writes.
#[derive(Clone)]
pub(crate) struct RocksDBWriteCtx {
@@ -68,6 +86,9 @@ pub(crate) struct RocksDBWriteCtx {
pub storage_settings: StorageSettings,
/// Pending batches to push to after writing.
pub pending_batches: PendingRocksDBBatches,
/// Pending history writes accumulated across `save_blocks()` calls.
/// Materialized into `RocksDB` shards at commit time.
pub pending_history: PendingHistory,
}
impl fmt::Debug for RocksDBWriteCtx {
@@ -77,6 +98,7 @@ impl fmt::Debug for RocksDBWriteCtx {
.field("prune_tx_lookup", &self.prune_tx_lookup)
.field("storage_settings", &self.storage_settings)
.field("pending_batches", &"<pending batches>")
.field("pending_history", &"<pending history>")
.finish()
}
}
@@ -594,7 +616,7 @@ impl RocksDBProvider {
let write_options = WriteOptions::default();
let txn_options = OptimisticTransactionOptions::default();
let inner = self.0.db_rw().transaction_opt(&write_options, &txn_options);
RocksTx { inner, provider: self }
RocksTx { inner, provider: self, assume_history_complete: false }
}
/// Creates a new batch for atomic writes.
@@ -1057,56 +1079,109 @@ impl RocksDBProvider {
Ok(())
}
/// Writes account history indices for the given blocks.
/// Accumulates account history indices for the given blocks.
///
/// Indices are accumulated in `ctx.pending_history` and materialized at commit time
/// by [`RocksDBProvider::commit_pending_history`]. This ensures each key is written
/// exactly once, working around `RocksDB` batch reads not seeing uncommitted writes.
///
/// Only accounts with info changes (nonce, balance, `code_hash`) or lifecycle changes
/// (created/destroyed) are included, matching MDBX `AccountChangeSets` behavior.
#[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
fn write_account_history<N: reth_node_types::NodePrimitives>(
&self,
blocks: &[ExecutedBlock<N>],
ctx: &RocksDBWriteCtx,
) -> ProviderResult<()> {
let mut batch = self.batch();
let mut account_history: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
for (block_idx, block) in blocks.iter().enumerate() {
let block_number = ctx.first_block_number + block_idx as u64;
let mut local: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
for block in blocks {
let block_number = block.recovered_block().number();
let bundle = &block.execution_outcome().state;
for &address in bundle.state().keys() {
account_history.entry(address).or_default().push(block_number);
for (&address, account) in bundle.state() {
let info_changed = account.is_info_changed();
let was_destroyed = account.was_destroyed();
if info_changed || was_destroyed {
local.entry(address).or_default().push(block_number);
}
}
}
// Write account history using proper shard append logic
for (address, indices) in account_history {
batch.append_account_history_shard(address, indices)?;
let mut pending = ctx.pending_history.lock();
for (address, mut indices) in local {
pending.accounts.entry(address).or_default().append(&mut indices);
}
ctx.pending_batches.lock().push(batch.into_inner());
Ok(())
}
/// Writes storage history indices for the given blocks.
/// Accumulates storage history indices for the given blocks.
///
/// Indices are accumulated in `ctx.pending_history` and materialized at commit time
/// by [`RocksDBProvider::commit_pending_history`]. This ensures each key is written
/// exactly once, working around `RocksDB` batch reads not seeing uncommitted writes.
///
/// Only storage slots with actual value changes are included, matching MDBX behavior.
#[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
fn write_storage_history<N: reth_node_types::NodePrimitives>(
&self,
blocks: &[ExecutedBlock<N>],
ctx: &RocksDBWriteCtx,
) -> ProviderResult<()> {
let mut batch = self.batch();
let mut storage_history: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
for (block_idx, block) in blocks.iter().enumerate() {
let block_number = ctx.first_block_number + block_idx as u64;
let mut local: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
for block in blocks {
let block_number = block.recovered_block().number();
let bundle = &block.execution_outcome().state;
for (&address, account) in bundle.state() {
for &slot in account.storage.keys() {
let key = B256::new(slot.to_be_bytes());
storage_history.entry((address, key)).or_default().push(block_number);
for (&slot, storage_slot) in &account.storage {
if storage_slot.is_changed() {
let key = B256::new(slot.to_be_bytes());
local.entry((address, key)).or_default().push(block_number);
}
}
}
}
// Write storage history using proper shard append logic
for ((address, slot), indices) in storage_history {
let mut pending = ctx.pending_history.lock();
for ((address, slot), mut indices) in local {
pending.storages.entry((address, slot)).or_default().append(&mut indices);
}
Ok(())
}
/// Materializes pending history writes into `RocksDB` shards and commits them.
///
/// This is called at provider commit time, AFTER MDBX has committed. Each key is
/// written exactly once, ensuring [`RocksDBBatch::append_account_history_shard`] and
/// [`RocksDBBatch::append_storage_history_shard`] see committed state.
pub(crate) fn commit_pending_history(
&self,
history: PendingHistoryWrites,
) -> ProviderResult<()> {
if history.accounts.is_empty() && history.storages.is_empty() {
return Ok(());
}
let mut batch = self.batch();
// Write account history shards
for (address, mut indices) in history.accounts {
// Sort and dedup in case of duplicate block numbers from multiple save_blocks calls
indices.sort_unstable();
indices.dedup();
batch.append_account_history_shard(address, indices)?;
}
// Write storage history shards
for ((address, slot), mut indices) in history.storages {
indices.sort_unstable();
indices.dedup();
batch.append_storage_history_shard(address, slot, indices)?;
}
ctx.pending_batches.lock().push(batch.into_inner());
if !batch.is_empty() {
batch.commit()?;
}
Ok(())
}
}
@@ -1521,10 +1596,14 @@ impl<'a> RocksDBBatch<'a> {
/// - Iteration over uncommitted data
///
/// Note: `Transaction` is `Send` but NOT `Sync`. This wrapper does not implement
/// `DbTx`/`DbTxMut` traits directly; use RocksDB-specific methods instead.
/// `DbTx`/`DbTxMut` traits directly; use `RocksDB`-specific methods instead.
pub struct RocksTx<'db> {
inner: Transaction<'db, OptimisticTransactionDB>,
provider: &'db RocksDBProvider,
/// When true, assume `RocksDB` has complete history (like MDBX) and return `NotYetWritten`
/// when querying before the first history entry. When false (default), return
/// `MaybeInPlainState` for hybrid storage safety.
assume_history_complete: bool,
}
impl fmt::Debug for RocksTx<'_> {
@@ -1534,6 +1613,20 @@ impl fmt::Debug for RocksTx<'_> {
}
impl<'db> RocksTx<'db> {
/// Sets the `assume_history_complete` flag to true.
///
/// When enabled, history queries will return `NotYetWritten` (like MDBX) instead of
/// `MaybeInPlainState` when querying before the first history entry.
///
/// WARNING: Only use in tests where `RocksDB` has complete history (identical to MDBX).
/// In production, `RocksDB` may have partial history, so this flag would cause incorrect
/// behavior (treating accounts as non-existent when they exist in MDBX).
#[cfg(test)]
pub const fn with_assume_history_complete(mut self) -> Self {
self.assume_history_complete = true;
self
}
/// Gets a value from the specified table. Sees uncommitted writes in this transaction.
pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
let encoded_key = key.encode();
@@ -1700,13 +1793,19 @@ impl<'db> RocksTx<'db> {
where
T: Table<Value = BlockNumberList>,
{
// History may be pruned if a lowest available block is set.
let is_maybe_pruned = lowest_available_block_number.is_some();
// When RocksDB can't find history for an address, we must fall back to plain state.
//
// This is critical for hybrid storage: RocksDB only contains history indices for blocks
// written AFTER it was enabled. Accounts modified before that exist only in MDBX.
// Returning `NotYetWritten` would incorrectly treat them as non-existent (nonce 0).
//
// When `assume_history_complete` is true (for tests with identical data), return
// `NotYetWritten` to match MDBX semantics.
let fallback = || {
Ok(if is_maybe_pruned {
HistoryInfo::MaybeInPlainState
} else {
Ok(if self.assume_history_complete {
HistoryInfo::NotYetWritten
} else {
HistoryInfo::MaybeInPlainState
})
};
@@ -1757,11 +1856,27 @@ impl<'db> RocksTx<'db> {
false
};
Ok(HistoryInfo::from_lookup(
// For RocksDB, we must handle `is_before_first_write` specially.
//
// When RocksDB says the target block is before the first recorded write:
// - MDBX would return `NotYetWritten` (account doesn't exist) because MDBX has full history
// - RocksDB should return `MaybeInPlainState` because the account may exist in MDBX from
// before RocksDB was enabled
//
// We use `from_lookup` but then override `NotYetWritten` -> `MaybeInPlainState`
// unless `assume_history_complete` is true (for tests with identical data).
let result = HistoryInfo::from_lookup(
found_block,
is_before_first_write,
lowest_available_block_number,
))
);
Ok(match result {
HistoryInfo::NotYetWritten if !self.assume_history_complete => {
HistoryInfo::MaybeInPlainState
}
other => other,
})
}
/// Returns an error if the raw iterator is in an invalid state due to an I/O error.
@@ -2257,7 +2372,7 @@ mod tests {
/// Tests the edge case where block < `lowest_available_block_number`.
/// This case cannot be tested via `HistoricalStateProviderRef` (which errors before lookup),
/// so we keep this RocksDB-specific test to verify the low-level behavior.
/// so we keep this `RocksDB`-specific test to verify the low-level behavior.
#[test]
fn test_account_history_info_pruned_before_first_entry() {
let temp_dir = TempDir::new().unwrap();

View File

@@ -32,6 +32,13 @@ pub struct RocksDBTableStats {
pub pending_compaction_bytes: u64,
}
/// Pending history writes (stub - empty struct).
#[derive(Debug, Default)]
pub(crate) struct PendingHistoryWrites;
/// Pending history writes type alias (stub).
pub(crate) type PendingHistory = Arc<Mutex<PendingHistoryWrites>>;
/// Context for `RocksDB` block writes (stub).
#[derive(Debug, Clone)]
#[allow(dead_code)]
@@ -44,6 +51,8 @@ pub(crate) struct RocksDBWriteCtx {
pub storage_settings: StorageSettings,
/// Pending batches (stub - unused).
pub pending_batches: PendingRocksDBBatches,
/// Pending history writes (stub - unused).
pub pending_history: PendingHistory,
}
/// A stub `RocksDB` provider.