Compare commits

..

1 Commits

Author SHA1 Message Date
yongkangc
082daa7fc1 fix(rocksdb): filter account/storage history to match MDBX semantics 2026-01-22 12:58:59 +00:00
6 changed files with 97 additions and 316 deletions

View File

@@ -1351,8 +1351,7 @@ mod rocksdb_tests {
// Run queries against both backends using EitherReader
let mdbx_ro = factory.database_provider_ro().unwrap();
// Use `with_assume_history_complete()` since both backends have identical data
let rocks_tx = rocks_provider.tx().with_assume_history_complete();
let rocks_tx = rocks_provider.tx();
for (i, query) in queries.iter().enumerate() {
// MDBX query via EitherReader
@@ -1444,8 +1443,7 @@ mod rocksdb_tests {
// Run queries against both backends using EitherReader
let mdbx_ro = factory.database_provider_ro().unwrap();
// Use `with_assume_history_complete()` since both backends have identical data
let rocks_tx = rocks_provider.tx().with_assume_history_complete();
let rocks_tx = rocks_provider.tx();
for (i, query) in queries.iter().enumerate() {
// MDBX query via EitherReader

View File

@@ -2,7 +2,7 @@ use crate::{
changesets_utils::StorageRevertsIter,
providers::{
database::{chain::ChainStorage, metrics},
rocksdb::{PendingHistory, PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
rocksdb::{PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
static_file::{StaticFileWriteCtx, StaticFileWriter},
NodeTypesForProvider, StaticFileProvider,
},
@@ -186,10 +186,6 @@ pub struct DatabaseProvider<TX, N: NodeTypes> {
/// Pending `RocksDB` batches to be committed at provider commit time.
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
pending_rocksdb_batches: PendingRocksDBBatches,
/// Pending history writes accumulated across `save_blocks()` calls.
/// Materialized into `RocksDB` shards at commit time after MDBX commits.
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
pending_rocksdb_history: PendingHistory,
/// Minimum distance from tip required for pruning
minimum_pruning_distance: u64,
/// Database provider metrics
@@ -208,7 +204,6 @@ impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
.field("rocksdb_provider", &self.rocksdb_provider)
.field("changeset_cache", &self.changeset_cache)
.field("pending_rocksdb_batches", &"<pending batches>")
.field("pending_rocksdb_history", &"<pending history>")
.field("minimum_pruning_distance", &self.minimum_pruning_distance)
.finish()
}
@@ -342,7 +337,6 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
rocksdb_provider,
changeset_cache,
pending_rocksdb_batches: Default::default(),
pending_rocksdb_history: Default::default(),
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
metrics: metrics::DatabaseProviderMetrics::default(),
}
@@ -416,7 +410,6 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
prune_tx_lookup: self.prune_modes.transaction_lookup,
storage_settings: self.cached_storage_settings(),
pending_batches: self.pending_rocksdb_batches.clone(),
pending_history: self.pending_rocksdb_history.clone(),
}
}
@@ -883,7 +876,6 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
rocksdb_provider,
changeset_cache,
pending_rocksdb_batches: Default::default(),
pending_rocksdb_history: Default::default(),
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
metrics: metrics::DatabaseProviderMetrics::default(),
}
@@ -3486,19 +3478,6 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
}
/// Commit database transaction, static files, and pending `RocksDB` batches.
///
/// # Commit Order (Critical for Correctness)
///
/// `RocksDB` history indices must commit AFTER MDBX changesets to prevent a race
/// condition where history indices point to non-existent changesets:
///
/// 1. Static files finalize (headers, transactions, receipts)
/// 2. MDBX commits (changesets become visible)
/// 3. `RocksDB` batches commit (non-history data like tx hashes)
/// 4. `RocksDB` history commits (account/storage history indices)
///
/// This ordering ensures that when a reader follows a `RocksDB` history index
/// to a changeset, the changeset exists in MDBX.
fn commit(self) -> ProviderResult<()> {
// For unwinding it makes more sense to commit the database first, since if
// it is interrupted before the static files commit, we can just
@@ -3509,15 +3488,10 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
#[cfg(all(unix, feature = "rocksdb"))]
{
// Commit non-history batches
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
for batch in batches {
self.rocksdb_provider.commit_batch(batch)?;
}
// Commit pending history (after MDBX, so changesets exist)
let history = std::mem::take(&mut *self.pending_rocksdb_history.lock());
self.rocksdb_provider.commit_pending_history(history)?;
}
self.static_file_provider.commit()?;
@@ -3529,30 +3503,20 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
self.static_file_provider.finalize()?;
timings.sf = start.elapsed();
// CRITICAL: MDBX must commit BEFORE RocksDB history indices.
// This prevents a race where history says "look at changeset N" but
// the changeset doesn't exist yet.
let start = Instant::now();
self.tx.commit()?;
timings.mdbx = start.elapsed();
#[cfg(all(unix, feature = "rocksdb"))]
{
let start = Instant::now();
// Commit non-history batches (tx hashes, etc.)
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
for batch in batches {
self.rocksdb_provider.commit_batch(batch)?;
}
// Commit pending history (after MDBX, so changesets exist)
let history = std::mem::take(&mut *self.pending_rocksdb_history.lock());
self.rocksdb_provider.commit_pending_history(history)?;
timings.rocksdb = start.elapsed();
}
let start = Instant::now();
self.tx.commit()?;
timings.mdbx = start.elapsed();
self.metrics.record_commit(&timings);
}

View File

@@ -253,21 +253,14 @@ impl RocksDBProvider {
return Ok(None);
}
// Find the max block number across all entries, including sentinel shards.
// Find the max highest_block_number (excluding u64::MAX sentinel) across all
// entries. Also track if we found any non-sentinel entries.
let mut max_highest_block = 0u64;
let mut max_sentinel_block = 0u64;
let mut found_non_sentinel = false;
for result in self.iter::<tables::StoragesHistory>()? {
let (key, value) = result?;
let (key, _) = result?;
let highest = key.sharded_key.highest_block_number;
if highest == u64::MAX {
if let Some(max_in_shard) = value.iter().max() &&
max_in_shard > max_sentinel_block
{
max_sentinel_block = max_in_shard;
}
} else {
if highest != u64::MAX {
found_non_sentinel = true;
if highest > max_highest_block {
max_highest_block = highest;
@@ -275,34 +268,31 @@ impl RocksDBProvider {
}
}
let effective_max = max_highest_block.max(max_sentinel_block);
// If we only have sentinel shards and they're all empty, we're consistent
if !found_non_sentinel && max_sentinel_block == 0 {
// If all entries are sentinel entries (u64::MAX), treat as first-run scenario.
// This means no completed shards exist (only sentinel shards with
// highest_block_number=u64::MAX), so no actual history has been indexed.
if !found_non_sentinel {
return Ok(None);
}
// If any entry has data > checkpoint, prune excess
if effective_max > checkpoint {
// If any entry has highest_block > checkpoint, prune excess
if max_highest_block > checkpoint {
tracing::info!(
target: "reth::providers::rocksdb",
rocks_highest = effective_max,
max_non_sentinel = max_highest_block,
max_sentinel = max_sentinel_block,
rocks_highest = max_highest_block,
checkpoint,
"StoragesHistory ahead of checkpoint, pruning excess data"
);
self.prune_storages_history_above(checkpoint)?;
return Ok(None);
} else if effective_max < checkpoint {
} else if max_highest_block < checkpoint {
// RocksDB is behind checkpoint, return highest block to signal unwind needed
tracing::warn!(
target: "reth::providers::rocksdb",
rocks_highest = effective_max,
rocks_highest = max_highest_block,
checkpoint,
"StoragesHistory behind checkpoint, unwind needed"
);
return Ok(Some(effective_max));
return Ok(Some(max_highest_block));
}
Ok(None)
@@ -319,42 +309,25 @@ impl RocksDBProvider {
/// For `StoragesHistory`, the key contains `highest_block_number`, so we can iterate
/// and delete entries where `key.sharded_key.highest_block_number > max_block`.
///
/// Sentinel shards (with `highest_block_number = u64::MAX`) require special handling:
/// we must read their contents and filter out block numbers > `max_block`.
///
/// TODO(<https://github.com/paradigmxyz/reth/issues/20417>): this iterates the whole table,
/// which is inefficient. Use changeset-based pruning instead.
fn prune_storages_history_above(&self, max_block: BlockNumber) -> ProviderResult<()> {
use reth_db_api::{models::storage_sharded_key::StorageShardedKey, BlockNumberList};
use reth_db_api::models::storage_sharded_key::StorageShardedKey;
let mut to_delete: Vec<StorageShardedKey> = Vec::new();
let mut to_rewrite: Vec<(StorageShardedKey, BlockNumberList)> = Vec::new();
for result in self.iter::<tables::StoragesHistory>()? {
let (key, value) = result?;
let (key, _) = result?;
let highest_block = key.sharded_key.highest_block_number;
if max_block == 0 {
to_delete.push(key);
} else if highest_block == u64::MAX {
let filtered: Vec<u64> = value.iter().filter(|&bn| bn <= max_block).collect();
if filtered.is_empty() {
to_delete.push(key);
} else if filtered.len() != value.len() as usize {
to_rewrite.push((key, BlockNumberList::new_pre_sorted(filtered)));
}
} else if highest_block > max_block {
if max_block == 0 || (highest_block != u64::MAX && highest_block > max_block) {
to_delete.push(key);
}
}
let deleted = to_delete.len();
let rewritten = to_rewrite.len();
if deleted > 0 || rewritten > 0 {
if deleted > 0 {
tracing::info!(
target: "reth::providers::rocksdb",
deleted_count = deleted,
rewritten_count = rewritten,
max_block,
"Pruning StoragesHistory entries"
);
@@ -363,9 +336,6 @@ impl RocksDBProvider {
for key in to_delete {
batch.delete::<tables::StoragesHistory>(key)?;
}
for (key, value) in to_rewrite {
batch.put::<tables::StoragesHistory>(key, &value)?;
}
batch.commit()?;
}
@@ -404,24 +374,14 @@ impl RocksDBProvider {
return Ok(None);
}
// Find the max block number across all entries, including sentinel shards.
// For sentinel shards (highest_block_number == u64::MAX), we need to look
// at the actual block numbers stored in the shard.
// Find the max highest_block_number (excluding u64::MAX sentinel) across all
// entries. Also track if we found any non-sentinel entries.
let mut max_highest_block = 0u64;
let mut max_sentinel_block = 0u64;
let mut found_non_sentinel = false;
for result in self.iter::<tables::AccountsHistory>()? {
let (key, value) = result?;
let (key, _) = result?;
let highest = key.highest_block_number;
if highest == u64::MAX {
// Sentinel shard: check the actual max block number in the shard
if let Some(max_in_shard) = value.iter().max() &&
max_in_shard > max_sentinel_block
{
max_sentinel_block = max_in_shard;
}
} else {
if highest != u64::MAX {
found_non_sentinel = true;
if highest > max_highest_block {
max_highest_block = highest;
@@ -429,22 +389,18 @@ impl RocksDBProvider {
}
}
// Use the higher of the two maxes
let effective_max = max_highest_block.max(max_sentinel_block);
// If we only have sentinel shards and they're all within checkpoint,
// we're consistent
if !found_non_sentinel && max_sentinel_block == 0 {
// If all entries are sentinel entries (u64::MAX), treat as first-run scenario.
// This means no completed shards exist (only sentinel shards with
// highest_block_number=u64::MAX), so no actual history has been indexed.
if !found_non_sentinel {
return Ok(None);
}
// If any entry has data > checkpoint, prune excess
if effective_max > checkpoint {
// If any entry has highest_block > checkpoint, prune excess
if max_highest_block > checkpoint {
tracing::info!(
target: "reth::providers::rocksdb",
rocks_highest = effective_max,
max_non_sentinel = max_highest_block,
max_sentinel = max_sentinel_block,
rocks_highest = max_highest_block,
checkpoint,
"AccountsHistory ahead of checkpoint, pruning excess data"
);
@@ -453,14 +409,14 @@ impl RocksDBProvider {
}
// If RocksDB is behind the checkpoint, request an unwind to rebuild.
if effective_max < checkpoint {
if max_highest_block < checkpoint {
tracing::warn!(
target: "reth::providers::rocksdb",
rocks_highest = effective_max,
rocks_highest = max_highest_block,
checkpoint,
"AccountsHistory behind checkpoint, unwind needed"
);
return Ok(Some(effective_max));
return Ok(Some(max_highest_block));
}
Ok(None)
@@ -478,47 +434,26 @@ impl RocksDBProvider {
/// `highest_block_number`, so we can iterate and delete entries where
/// `key.highest_block_number > max_block`.
///
/// Sentinel shards (with `highest_block_number = u64::MAX`) require special handling:
/// we must read their contents and filter out block numbers > `max_block`.
///
/// TODO(<https://github.com/paradigmxyz/reth/issues/20417>): this iterates the whole table,
/// which is inefficient. Use changeset-based pruning instead.
fn prune_accounts_history_above(&self, max_block: BlockNumber) -> ProviderResult<()> {
use alloy_primitives::Address;
use reth_db_api::{models::ShardedKey, BlockNumberList};
use reth_db_api::models::ShardedKey;
let mut to_delete: Vec<ShardedKey<Address>> = Vec::new();
let mut to_rewrite: Vec<(ShardedKey<Address>, BlockNumberList)> = Vec::new();
for result in self.iter::<tables::AccountsHistory>()? {
let (key, value) = result?;
let (key, _) = result?;
let highest_block = key.highest_block_number;
if max_block == 0 {
// Clear everything
to_delete.push(key);
} else if highest_block == u64::MAX {
// Sentinel shard: filter out block numbers > max_block
let filtered: Vec<u64> = value.iter().filter(|&bn| bn <= max_block).collect();
if filtered.is_empty() {
to_delete.push(key);
} else if filtered.len() != value.len() as usize {
// Some entries were filtered out, rewrite the shard
to_rewrite.push((key, BlockNumberList::new_pre_sorted(filtered)));
}
} else if highest_block > max_block {
// Non-sentinel shard above max_block: delete
if max_block == 0 || (highest_block != u64::MAX && highest_block > max_block) {
to_delete.push(key);
}
}
let deleted = to_delete.len();
let rewritten = to_rewrite.len();
if deleted > 0 || rewritten > 0 {
if deleted > 0 {
tracing::info!(
target: "reth::providers::rocksdb",
deleted_count = deleted,
rewritten_count = rewritten,
max_block,
"Pruning AccountsHistory entries"
);
@@ -527,9 +462,6 @@ impl RocksDBProvider {
for key in to_delete {
batch.delete::<tables::AccountsHistory>(key)?;
}
for (key, value) in to_rewrite {
batch.put::<tables::AccountsHistory>(key, &value)?;
}
batch.commit()?;
}
@@ -1064,7 +996,6 @@ mod tests {
// This simulates a scenario where history tracking started but no shards were completed
let key_sentinel_1 = StorageShardedKey::new(Address::ZERO, B256::ZERO, u64::MAX);
let key_sentinel_2 = StorageShardedKey::new(Address::random(), B256::random(), u64::MAX);
// Sentinel shards contain blocks [10, 20, 30], so max block in shard = 30
let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
rocksdb.put::<tables::StoragesHistory>(key_sentinel_1, &block_list).unwrap();
rocksdb.put::<tables::StoragesHistory>(key_sentinel_2, &block_list).unwrap();
@@ -1078,21 +1009,24 @@ mod tests {
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
// Checkpoint = 30 matches sentinel shard max block, simulating "consistent" state
// where indexing is complete through the checkpoint block
// Set a checkpoint indicating we should have processed up to block 100
{
let provider = factory.database_provider_rw().unwrap();
provider
.save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(30))
.save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
.unwrap();
provider.commit().unwrap();
}
let provider = factory.database_provider_ro().unwrap();
// RocksDB has sentinel entries matching the checkpoint - consistent state
// RocksDB has only sentinel entries (no completed shards) but checkpoint is set.
// This is treated as a first-run/migration scenario - no unwind needed.
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(result, None, "Sentinel-only entries matching checkpoint should be consistent");
assert_eq!(
result, None,
"Sentinel-only entries with checkpoint should be treated as first run"
);
}
#[test]
@@ -1108,7 +1042,6 @@ mod tests {
// Insert ONLY sentinel entries (highest_block_number = u64::MAX)
let key_sentinel_1 = ShardedKey::new(Address::ZERO, u64::MAX);
let key_sentinel_2 = ShardedKey::new(Address::random(), u64::MAX);
// Sentinel shards contain blocks [10, 20, 30], so max block in shard = 30
let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
rocksdb.put::<tables::AccountsHistory>(key_sentinel_1, &block_list).unwrap();
rocksdb.put::<tables::AccountsHistory>(key_sentinel_2, &block_list).unwrap();
@@ -1122,21 +1055,24 @@ mod tests {
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
// Checkpoint = 30 matches sentinel shard max block, simulating "consistent" state
// where indexing is complete through the checkpoint block
// Set a checkpoint indicating we should have processed up to block 100
{
let provider = factory.database_provider_rw().unwrap();
provider
.save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(30))
.save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
.unwrap();
provider.commit().unwrap();
}
let provider = factory.database_provider_ro().unwrap();
// RocksDB has sentinel entries matching the checkpoint - consistent state
// RocksDB has only sentinel entries (no completed shards) but checkpoint is set.
// This is treated as a first-run/migration scenario - no unwind needed.
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(result, None, "Sentinel-only entries matching checkpoint should be consistent");
assert_eq!(
result, None,
"Sentinel-only entries with checkpoint should be treated as first run"
);
}
#[test]

View File

@@ -4,10 +4,7 @@ mod invariants;
mod metrics;
mod provider;
#[allow(unused_imports)]
pub(crate) use provider::{
PendingHistory, PendingHistoryWrites, PendingRocksDBBatches, RocksDBWriteCtx,
};
pub(crate) use provider::{PendingRocksDBBatches, RocksDBWriteCtx};
pub use provider::{
RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksDBRawIter, RocksDBTableStats, RocksTx,
};

View File

@@ -15,7 +15,7 @@ use reth_db_api::{
table::{Compress, Decode, Decompress, Encode, Table},
tables, BlockNumberList, DatabaseError,
};
use reth_primitives_traits::{AlloyBlockHeader as _, BlockBody as _};
use reth_primitives_traits::BlockBody as _;
use reth_prune_types::PruneMode;
use reth_storage_errors::{
db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel},
@@ -57,24 +57,6 @@ pub struct RocksDBTableStats {
pub pending_compaction_bytes: u64,
}
/// Pending history writes that are accumulated across multiple `save_blocks()` calls
/// and materialized into `RocksDB` shards at commit time.
///
/// This exists because [`RocksDBBatch::append_account_history_shard`] and
/// [`RocksDBBatch::append_storage_history_shard`] read from committed state, not pending writes.
/// By accumulating all history deltas in memory and materializing once at commit time, we ensure
/// each key is written exactly once with all its indices.
#[derive(Debug, Default)]
pub(crate) struct PendingHistoryWrites {
/// Account history: address -> list of block numbers where the account was modified.
pub accounts: BTreeMap<Address, Vec<u64>>,
/// Storage history: (address, slot) -> list of block numbers where the storage was modified.
pub storages: BTreeMap<(Address, B256), Vec<u64>>,
}
/// Pending history writes type alias.
pub(crate) type PendingHistory = Arc<Mutex<PendingHistoryWrites>>;
/// Context for `RocksDB` block writes.
#[derive(Clone)]
pub(crate) struct RocksDBWriteCtx {
@@ -86,9 +68,6 @@ pub(crate) struct RocksDBWriteCtx {
pub storage_settings: StorageSettings,
/// Pending batches to push to after writing.
pub pending_batches: PendingRocksDBBatches,
/// Pending history writes accumulated across `save_blocks()` calls.
/// Materialized into `RocksDB` shards at commit time.
pub pending_history: PendingHistory,
}
impl fmt::Debug for RocksDBWriteCtx {
@@ -98,7 +77,6 @@ impl fmt::Debug for RocksDBWriteCtx {
.field("prune_tx_lookup", &self.prune_tx_lookup)
.field("storage_settings", &self.storage_settings)
.field("pending_batches", &"<pending batches>")
.field("pending_history", &"<pending history>")
.finish()
}
}
@@ -616,7 +594,7 @@ impl RocksDBProvider {
let write_options = WriteOptions::default();
let txn_options = OptimisticTransactionOptions::default();
let inner = self.0.db_rw().transaction_opt(&write_options, &txn_options);
RocksTx { inner, provider: self, assume_history_complete: false }
RocksTx { inner, provider: self }
}
/// Creates a new batch for atomic writes.
@@ -1079,109 +1057,66 @@ impl RocksDBProvider {
Ok(())
}
/// Accumulates account history indices for the given blocks.
///
/// Indices are accumulated in `ctx.pending_history` and materialized at commit time
/// by [`RocksDBProvider::commit_pending_history`]. This ensures each key is written
/// exactly once, working around `RocksDB` batch reads not seeing uncommitted writes.
///
/// Only accounts with info changes (nonce, balance, `code_hash`) or lifecycle changes
/// (created/destroyed) are included, matching MDBX `AccountChangeSets` behavior.
/// Writes account history indices for the given blocks.
#[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
fn write_account_history<N: reth_node_types::NodePrimitives>(
&self,
blocks: &[ExecutedBlock<N>],
ctx: &RocksDBWriteCtx,
) -> ProviderResult<()> {
let mut local: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
for block in blocks {
let block_number = block.recovered_block().number();
let mut batch = self.batch();
let mut account_history: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
for (block_idx, block) in blocks.iter().enumerate() {
let block_number = ctx.first_block_number + block_idx as u64;
let bundle = &block.execution_outcome().state;
// Only record accounts where account-info changed OR account was destroyed.
// Skip accounts that only had storage changes - this matches MDBX semantics
// where AccountsHistory tracks account-level changes, not storage-only touches.
for (&address, account) in bundle.state() {
let info_changed = account.is_info_changed();
let was_destroyed = account.was_destroyed();
if info_changed || was_destroyed {
local.entry(address).or_default().push(block_number);
if account.is_info_changed() || account.was_destroyed() {
account_history.entry(address).or_default().push(block_number);
}
}
}
let mut pending = ctx.pending_history.lock();
for (address, mut indices) in local {
pending.accounts.entry(address).or_default().append(&mut indices);
// Write account history using proper shard append logic
for (address, indices) in account_history {
batch.append_account_history_shard(address, indices)?;
}
ctx.pending_batches.lock().push(batch.into_inner());
Ok(())
}
/// Accumulates storage history indices for the given blocks.
///
/// Indices are accumulated in `ctx.pending_history` and materialized at commit time
/// by [`RocksDBProvider::commit_pending_history`]. This ensures each key is written
/// exactly once, working around `RocksDB` batch reads not seeing uncommitted writes.
///
/// Only storage slots with actual value changes are included, matching MDBX behavior.
/// Writes storage history indices for the given blocks.
#[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
fn write_storage_history<N: reth_node_types::NodePrimitives>(
&self,
blocks: &[ExecutedBlock<N>],
ctx: &RocksDBWriteCtx,
) -> ProviderResult<()> {
let mut local: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
for block in blocks {
let block_number = block.recovered_block().number();
let mut batch = self.batch();
let mut storage_history: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
for (block_idx, block) in blocks.iter().enumerate() {
let block_number = ctx.first_block_number + block_idx as u64;
let bundle = &block.execution_outcome().state;
for (&address, account) in bundle.state() {
// Only record storage slots that actually changed value.
// This matches MDBX semantics where StoragesHistory only tracks
// slots with value changes, not just touched slots.
for (&slot, storage_slot) in &account.storage {
if storage_slot.is_changed() {
let key = B256::new(slot.to_be_bytes());
local.entry((address, key)).or_default().push(block_number);
storage_history.entry((address, key)).or_default().push(block_number);
}
}
}
}
let mut pending = ctx.pending_history.lock();
for ((address, slot), mut indices) in local {
pending.storages.entry((address, slot)).or_default().append(&mut indices);
}
Ok(())
}
/// Materializes pending history writes into `RocksDB` shards and commits them.
///
/// This is called at provider commit time, AFTER MDBX has committed. Each key is
/// written exactly once, ensuring [`RocksDBBatch::append_account_history_shard`] and
/// [`RocksDBBatch::append_storage_history_shard`] see committed state.
pub(crate) fn commit_pending_history(
&self,
history: PendingHistoryWrites,
) -> ProviderResult<()> {
if history.accounts.is_empty() && history.storages.is_empty() {
return Ok(());
}
let mut batch = self.batch();
// Write account history shards
for (address, mut indices) in history.accounts {
// Sort and dedup in case of duplicate block numbers from multiple save_blocks calls
indices.sort_unstable();
indices.dedup();
batch.append_account_history_shard(address, indices)?;
}
// Write storage history shards
for ((address, slot), mut indices) in history.storages {
indices.sort_unstable();
indices.dedup();
// Write storage history using proper shard append logic
for ((address, slot), indices) in storage_history {
batch.append_storage_history_shard(address, slot, indices)?;
}
if !batch.is_empty() {
batch.commit()?;
}
ctx.pending_batches.lock().push(batch.into_inner());
Ok(())
}
}
@@ -1596,14 +1531,10 @@ impl<'a> RocksDBBatch<'a> {
/// - Iteration over uncommitted data
///
/// Note: `Transaction` is `Send` but NOT `Sync`. This wrapper does not implement
/// `DbTx`/`DbTxMut` traits directly; use `RocksDB`-specific methods instead.
/// `DbTx`/`DbTxMut` traits directly; use RocksDB-specific methods instead.
pub struct RocksTx<'db> {
inner: Transaction<'db, OptimisticTransactionDB>,
provider: &'db RocksDBProvider,
/// When true, assume `RocksDB` has complete history (like MDBX) and return `NotYetWritten`
/// when querying before the first history entry. When false (default), return
/// `MaybeInPlainState` for hybrid storage safety.
assume_history_complete: bool,
}
impl fmt::Debug for RocksTx<'_> {
@@ -1613,20 +1544,6 @@ impl fmt::Debug for RocksTx<'_> {
}
impl<'db> RocksTx<'db> {
/// Sets the `assume_history_complete` flag to true.
///
/// When enabled, history queries will return `NotYetWritten` (like MDBX) instead of
/// `MaybeInPlainState` when querying before the first history entry.
///
/// WARNING: Only use in tests where `RocksDB` has complete history (identical to MDBX).
/// In production, `RocksDB` may have partial history, so this flag would cause incorrect
/// behavior (treating accounts as non-existent when they exist in MDBX).
#[cfg(test)]
pub const fn with_assume_history_complete(mut self) -> Self {
self.assume_history_complete = true;
self
}
/// Gets a value from the specified table. Sees uncommitted writes in this transaction.
pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
let encoded_key = key.encode();
@@ -1793,19 +1710,13 @@ impl<'db> RocksTx<'db> {
where
T: Table<Value = BlockNumberList>,
{
// When RocksDB can't find history for an address, we must fall back to plain state.
//
// This is critical for hybrid storage: RocksDB only contains history indices for blocks
// written AFTER it was enabled. Accounts modified before that exist only in MDBX.
// Returning `NotYetWritten` would incorrectly treat them as non-existent (nonce 0).
//
// When `assume_history_complete` is true (for tests with identical data), return
// `NotYetWritten` to match MDBX semantics.
// History may be pruned if a lowest available block is set.
let is_maybe_pruned = lowest_available_block_number.is_some();
let fallback = || {
Ok(if self.assume_history_complete {
HistoryInfo::NotYetWritten
} else {
Ok(if is_maybe_pruned {
HistoryInfo::MaybeInPlainState
} else {
HistoryInfo::NotYetWritten
})
};
@@ -1856,27 +1767,11 @@ impl<'db> RocksTx<'db> {
false
};
// For RocksDB, we must handle `is_before_first_write` specially.
//
// When RocksDB says the target block is before the first recorded write:
// - MDBX would return `NotYetWritten` (account doesn't exist) because MDBX has full history
// - RocksDB should return `MaybeInPlainState` because the account may exist in MDBX from
// before RocksDB was enabled
//
// We use `from_lookup` but then override `NotYetWritten` -> `MaybeInPlainState`
// unless `assume_history_complete` is true (for tests with identical data).
let result = HistoryInfo::from_lookup(
Ok(HistoryInfo::from_lookup(
found_block,
is_before_first_write,
lowest_available_block_number,
);
Ok(match result {
HistoryInfo::NotYetWritten if !self.assume_history_complete => {
HistoryInfo::MaybeInPlainState
}
other => other,
})
))
}
/// Returns an error if the raw iterator is in an invalid state due to an I/O error.
@@ -2372,7 +2267,7 @@ mod tests {
/// Tests the edge case where block < `lowest_available_block_number`.
/// This case cannot be tested via `HistoricalStateProviderRef` (which errors before lookup),
/// so we keep this `RocksDB`-specific test to verify the low-level behavior.
/// so we keep this RocksDB-specific test to verify the low-level behavior.
#[test]
fn test_account_history_info_pruned_before_first_entry() {
let temp_dir = TempDir::new().unwrap();

View File

@@ -32,13 +32,6 @@ pub struct RocksDBTableStats {
pub pending_compaction_bytes: u64,
}
/// Pending history writes (stub - empty struct).
#[derive(Debug, Default)]
pub(crate) struct PendingHistoryWrites;
/// Pending history writes type alias (stub).
pub(crate) type PendingHistory = Arc<Mutex<PendingHistoryWrites>>;
/// Context for `RocksDB` block writes (stub).
#[derive(Debug, Clone)]
#[allow(dead_code)]
@@ -51,8 +44,6 @@ pub(crate) struct RocksDBWriteCtx {
pub storage_settings: StorageSettings,
/// Pending batches (stub - unused).
pub pending_batches: PendingRocksDBBatches,
/// Pending history writes (stub - unused).
pub pending_history: PendingHistory,
}
/// A stub `RocksDB` provider.