mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
5 Commits
yk/cache-l
...
wt-pr2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6be57ce9e1 | ||
|
|
b997bee71b | ||
|
|
e3c7413747 | ||
|
|
0bf2335c8c | ||
|
|
1f7a5acae1 |
@@ -1351,7 +1351,8 @@ mod rocksdb_tests {
|
||||
|
||||
// Run queries against both backends using EitherReader
|
||||
let mdbx_ro = factory.database_provider_ro().unwrap();
|
||||
let rocks_tx = rocks_provider.tx();
|
||||
// Use `with_assume_history_complete()` since both backends have identical data
|
||||
let rocks_tx = rocks_provider.tx().with_assume_history_complete();
|
||||
|
||||
for (i, query) in queries.iter().enumerate() {
|
||||
// MDBX query via EitherReader
|
||||
@@ -1443,7 +1444,8 @@ mod rocksdb_tests {
|
||||
|
||||
// Run queries against both backends using EitherReader
|
||||
let mdbx_ro = factory.database_provider_ro().unwrap();
|
||||
let rocks_tx = rocks_provider.tx();
|
||||
// Use `with_assume_history_complete()` since both backends have identical data
|
||||
let rocks_tx = rocks_provider.tx().with_assume_history_complete();
|
||||
|
||||
for (i, query) in queries.iter().enumerate() {
|
||||
// MDBX query via EitherReader
|
||||
|
||||
@@ -2,7 +2,7 @@ use crate::{
|
||||
changesets_utils::StorageRevertsIter,
|
||||
providers::{
|
||||
database::{chain::ChainStorage, metrics},
|
||||
rocksdb::{PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
|
||||
rocksdb::{PendingHistory, PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
|
||||
static_file::{StaticFileWriteCtx, StaticFileWriter},
|
||||
NodeTypesForProvider, StaticFileProvider,
|
||||
},
|
||||
@@ -186,6 +186,10 @@ pub struct DatabaseProvider<TX, N: NodeTypes> {
|
||||
/// Pending `RocksDB` batches to be committed at provider commit time.
|
||||
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
|
||||
pending_rocksdb_batches: PendingRocksDBBatches,
|
||||
/// Pending history writes accumulated across `save_blocks()` calls.
|
||||
/// Materialized into `RocksDB` shards at commit time after MDBX commits.
|
||||
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
|
||||
pending_rocksdb_history: PendingHistory,
|
||||
/// Minimum distance from tip required for pruning
|
||||
minimum_pruning_distance: u64,
|
||||
/// Database provider metrics
|
||||
@@ -204,6 +208,7 @@ impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
|
||||
.field("rocksdb_provider", &self.rocksdb_provider)
|
||||
.field("changeset_cache", &self.changeset_cache)
|
||||
.field("pending_rocksdb_batches", &"<pending batches>")
|
||||
.field("pending_rocksdb_history", &"<pending history>")
|
||||
.field("minimum_pruning_distance", &self.minimum_pruning_distance)
|
||||
.finish()
|
||||
}
|
||||
@@ -337,6 +342,7 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
|
||||
rocksdb_provider,
|
||||
changeset_cache,
|
||||
pending_rocksdb_batches: Default::default(),
|
||||
pending_rocksdb_history: Default::default(),
|
||||
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
|
||||
metrics: metrics::DatabaseProviderMetrics::default(),
|
||||
}
|
||||
@@ -410,6 +416,7 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
|
||||
prune_tx_lookup: self.prune_modes.transaction_lookup,
|
||||
storage_settings: self.cached_storage_settings(),
|
||||
pending_batches: self.pending_rocksdb_batches.clone(),
|
||||
pending_history: self.pending_rocksdb_history.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -876,6 +883,7 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
|
||||
rocksdb_provider,
|
||||
changeset_cache,
|
||||
pending_rocksdb_batches: Default::default(),
|
||||
pending_rocksdb_history: Default::default(),
|
||||
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
|
||||
metrics: metrics::DatabaseProviderMetrics::default(),
|
||||
}
|
||||
@@ -3478,6 +3486,19 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
|
||||
}
|
||||
|
||||
/// Commit database transaction, static files, and pending `RocksDB` batches.
|
||||
///
|
||||
/// # Commit Order (Critical for Correctness)
|
||||
///
|
||||
/// `RocksDB` history indices must commit AFTER MDBX changesets to prevent a race
|
||||
/// condition where history indices point to non-existent changesets:
|
||||
///
|
||||
/// 1. Static files finalize (headers, transactions, receipts)
|
||||
/// 2. MDBX commits (changesets become visible)
|
||||
/// 3. `RocksDB` batches commit (non-history data like tx hashes)
|
||||
/// 4. `RocksDB` history commits (account/storage history indices)
|
||||
///
|
||||
/// This ordering ensures that when a reader follows a `RocksDB` history index
|
||||
/// to a changeset, the changeset exists in MDBX.
|
||||
fn commit(self) -> ProviderResult<()> {
|
||||
// For unwinding it makes more sense to commit the database first, since if
|
||||
// it is interrupted before the static files commit, we can just
|
||||
@@ -3488,10 +3509,15 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
{
|
||||
// Commit non-history batches
|
||||
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
|
||||
for batch in batches {
|
||||
self.rocksdb_provider.commit_batch(batch)?;
|
||||
}
|
||||
|
||||
// Commit pending history (after MDBX, so changesets exist)
|
||||
let history = std::mem::take(&mut *self.pending_rocksdb_history.lock());
|
||||
self.rocksdb_provider.commit_pending_history(history)?;
|
||||
}
|
||||
|
||||
self.static_file_provider.commit()?;
|
||||
@@ -3503,20 +3529,30 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
|
||||
self.static_file_provider.finalize()?;
|
||||
timings.sf = start.elapsed();
|
||||
|
||||
// CRITICAL: MDBX must commit BEFORE RocksDB history indices.
|
||||
// This prevents a race where history says "look at changeset N" but
|
||||
// the changeset doesn't exist yet.
|
||||
let start = Instant::now();
|
||||
self.tx.commit()?;
|
||||
timings.mdbx = start.elapsed();
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
{
|
||||
let start = Instant::now();
|
||||
|
||||
// Commit non-history batches (tx hashes, etc.)
|
||||
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
|
||||
for batch in batches {
|
||||
self.rocksdb_provider.commit_batch(batch)?;
|
||||
}
|
||||
|
||||
// Commit pending history (after MDBX, so changesets exist)
|
||||
let history = std::mem::take(&mut *self.pending_rocksdb_history.lock());
|
||||
self.rocksdb_provider.commit_pending_history(history)?;
|
||||
|
||||
timings.rocksdb = start.elapsed();
|
||||
}
|
||||
|
||||
let start = Instant::now();
|
||||
self.tx.commit()?;
|
||||
timings.mdbx = start.elapsed();
|
||||
|
||||
self.metrics.record_commit(&timings);
|
||||
}
|
||||
|
||||
|
||||
@@ -253,14 +253,21 @@ impl RocksDBProvider {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// Find the max highest_block_number (excluding u64::MAX sentinel) across all
|
||||
// entries. Also track if we found any non-sentinel entries.
|
||||
// Find the max block number across all entries, including sentinel shards.
|
||||
let mut max_highest_block = 0u64;
|
||||
let mut max_sentinel_block = 0u64;
|
||||
let mut found_non_sentinel = false;
|
||||
|
||||
for result in self.iter::<tables::StoragesHistory>()? {
|
||||
let (key, _) = result?;
|
||||
let (key, value) = result?;
|
||||
let highest = key.sharded_key.highest_block_number;
|
||||
if highest != u64::MAX {
|
||||
if highest == u64::MAX {
|
||||
if let Some(max_in_shard) = value.iter().max() &&
|
||||
max_in_shard > max_sentinel_block
|
||||
{
|
||||
max_sentinel_block = max_in_shard;
|
||||
}
|
||||
} else {
|
||||
found_non_sentinel = true;
|
||||
if highest > max_highest_block {
|
||||
max_highest_block = highest;
|
||||
@@ -268,31 +275,34 @@ impl RocksDBProvider {
|
||||
}
|
||||
}
|
||||
|
||||
// If all entries are sentinel entries (u64::MAX), treat as first-run scenario.
|
||||
// This means no completed shards exist (only sentinel shards with
|
||||
// highest_block_number=u64::MAX), so no actual history has been indexed.
|
||||
if !found_non_sentinel {
|
||||
let effective_max = max_highest_block.max(max_sentinel_block);
|
||||
|
||||
// If we only have sentinel shards and they're all empty, we're consistent
|
||||
if !found_non_sentinel && max_sentinel_block == 0 {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// If any entry has highest_block > checkpoint, prune excess
|
||||
if max_highest_block > checkpoint {
|
||||
// If any entry has data > checkpoint, prune excess
|
||||
if effective_max > checkpoint {
|
||||
tracing::info!(
|
||||
target: "reth::providers::rocksdb",
|
||||
rocks_highest = max_highest_block,
|
||||
rocks_highest = effective_max,
|
||||
max_non_sentinel = max_highest_block,
|
||||
max_sentinel = max_sentinel_block,
|
||||
checkpoint,
|
||||
"StoragesHistory ahead of checkpoint, pruning excess data"
|
||||
);
|
||||
self.prune_storages_history_above(checkpoint)?;
|
||||
} else if max_highest_block < checkpoint {
|
||||
return Ok(None);
|
||||
} else if effective_max < checkpoint {
|
||||
// RocksDB is behind checkpoint, return highest block to signal unwind needed
|
||||
tracing::warn!(
|
||||
target: "reth::providers::rocksdb",
|
||||
rocks_highest = max_highest_block,
|
||||
rocks_highest = effective_max,
|
||||
checkpoint,
|
||||
"StoragesHistory behind checkpoint, unwind needed"
|
||||
);
|
||||
return Ok(Some(max_highest_block));
|
||||
return Ok(Some(effective_max));
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
@@ -309,25 +319,42 @@ impl RocksDBProvider {
|
||||
/// For `StoragesHistory`, the key contains `highest_block_number`, so we can iterate
|
||||
/// and delete entries where `key.sharded_key.highest_block_number > max_block`.
|
||||
///
|
||||
/// Sentinel shards (with `highest_block_number = u64::MAX`) require special handling:
|
||||
/// we must read their contents and filter out block numbers > `max_block`.
|
||||
///
|
||||
/// TODO(<https://github.com/paradigmxyz/reth/issues/20417>): this iterates the whole table,
|
||||
/// which is inefficient. Use changeset-based pruning instead.
|
||||
fn prune_storages_history_above(&self, max_block: BlockNumber) -> ProviderResult<()> {
|
||||
use reth_db_api::models::storage_sharded_key::StorageShardedKey;
|
||||
use reth_db_api::{models::storage_sharded_key::StorageShardedKey, BlockNumberList};
|
||||
|
||||
let mut to_delete: Vec<StorageShardedKey> = Vec::new();
|
||||
let mut to_rewrite: Vec<(StorageShardedKey, BlockNumberList)> = Vec::new();
|
||||
|
||||
for result in self.iter::<tables::StoragesHistory>()? {
|
||||
let (key, _) = result?;
|
||||
let (key, value) = result?;
|
||||
let highest_block = key.sharded_key.highest_block_number;
|
||||
if max_block == 0 || (highest_block != u64::MAX && highest_block > max_block) {
|
||||
|
||||
if max_block == 0 {
|
||||
to_delete.push(key);
|
||||
} else if highest_block == u64::MAX {
|
||||
let filtered: Vec<u64> = value.iter().filter(|&bn| bn <= max_block).collect();
|
||||
if filtered.is_empty() {
|
||||
to_delete.push(key);
|
||||
} else if filtered.len() != value.len() as usize {
|
||||
to_rewrite.push((key, BlockNumberList::new_pre_sorted(filtered)));
|
||||
}
|
||||
} else if highest_block > max_block {
|
||||
to_delete.push(key);
|
||||
}
|
||||
}
|
||||
|
||||
let deleted = to_delete.len();
|
||||
if deleted > 0 {
|
||||
let rewritten = to_rewrite.len();
|
||||
if deleted > 0 || rewritten > 0 {
|
||||
tracing::info!(
|
||||
target: "reth::providers::rocksdb",
|
||||
deleted_count = deleted,
|
||||
rewritten_count = rewritten,
|
||||
max_block,
|
||||
"Pruning StoragesHistory entries"
|
||||
);
|
||||
@@ -336,6 +363,9 @@ impl RocksDBProvider {
|
||||
for key in to_delete {
|
||||
batch.delete::<tables::StoragesHistory>(key)?;
|
||||
}
|
||||
for (key, value) in to_rewrite {
|
||||
batch.put::<tables::StoragesHistory>(key, &value)?;
|
||||
}
|
||||
batch.commit()?;
|
||||
}
|
||||
|
||||
@@ -374,14 +404,24 @@ impl RocksDBProvider {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// Find the max highest_block_number (excluding u64::MAX sentinel) across all
|
||||
// entries. Also track if we found any non-sentinel entries.
|
||||
// Find the max block number across all entries, including sentinel shards.
|
||||
// For sentinel shards (highest_block_number == u64::MAX), we need to look
|
||||
// at the actual block numbers stored in the shard.
|
||||
let mut max_highest_block = 0u64;
|
||||
let mut max_sentinel_block = 0u64;
|
||||
let mut found_non_sentinel = false;
|
||||
|
||||
for result in self.iter::<tables::AccountsHistory>()? {
|
||||
let (key, _) = result?;
|
||||
let (key, value) = result?;
|
||||
let highest = key.highest_block_number;
|
||||
if highest != u64::MAX {
|
||||
if highest == u64::MAX {
|
||||
// Sentinel shard: check the actual max block number in the shard
|
||||
if let Some(max_in_shard) = value.iter().max() &&
|
||||
max_in_shard > max_sentinel_block
|
||||
{
|
||||
max_sentinel_block = max_in_shard;
|
||||
}
|
||||
} else {
|
||||
found_non_sentinel = true;
|
||||
if highest > max_highest_block {
|
||||
max_highest_block = highest;
|
||||
@@ -389,18 +429,22 @@ impl RocksDBProvider {
|
||||
}
|
||||
}
|
||||
|
||||
// If all entries are sentinel entries (u64::MAX), treat as first-run scenario.
|
||||
// This means no completed shards exist (only sentinel shards with
|
||||
// highest_block_number=u64::MAX), so no actual history has been indexed.
|
||||
if !found_non_sentinel {
|
||||
// Use the higher of the two maxes
|
||||
let effective_max = max_highest_block.max(max_sentinel_block);
|
||||
|
||||
// If we only have sentinel shards and they're all within checkpoint,
|
||||
// we're consistent
|
||||
if !found_non_sentinel && max_sentinel_block == 0 {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// If any entry has highest_block > checkpoint, prune excess
|
||||
if max_highest_block > checkpoint {
|
||||
// If any entry has data > checkpoint, prune excess
|
||||
if effective_max > checkpoint {
|
||||
tracing::info!(
|
||||
target: "reth::providers::rocksdb",
|
||||
rocks_highest = max_highest_block,
|
||||
rocks_highest = effective_max,
|
||||
max_non_sentinel = max_highest_block,
|
||||
max_sentinel = max_sentinel_block,
|
||||
checkpoint,
|
||||
"AccountsHistory ahead of checkpoint, pruning excess data"
|
||||
);
|
||||
@@ -409,14 +453,14 @@ impl RocksDBProvider {
|
||||
}
|
||||
|
||||
// If RocksDB is behind the checkpoint, request an unwind to rebuild.
|
||||
if max_highest_block < checkpoint {
|
||||
if effective_max < checkpoint {
|
||||
tracing::warn!(
|
||||
target: "reth::providers::rocksdb",
|
||||
rocks_highest = max_highest_block,
|
||||
rocks_highest = effective_max,
|
||||
checkpoint,
|
||||
"AccountsHistory behind checkpoint, unwind needed"
|
||||
);
|
||||
return Ok(Some(max_highest_block));
|
||||
return Ok(Some(effective_max));
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
@@ -434,26 +478,47 @@ impl RocksDBProvider {
|
||||
/// `highest_block_number`, so we can iterate and delete entries where
|
||||
/// `key.highest_block_number > max_block`.
|
||||
///
|
||||
/// Sentinel shards (with `highest_block_number = u64::MAX`) require special handling:
|
||||
/// we must read their contents and filter out block numbers > `max_block`.
|
||||
///
|
||||
/// TODO(<https://github.com/paradigmxyz/reth/issues/20417>): this iterates the whole table,
|
||||
/// which is inefficient. Use changeset-based pruning instead.
|
||||
fn prune_accounts_history_above(&self, max_block: BlockNumber) -> ProviderResult<()> {
|
||||
use alloy_primitives::Address;
|
||||
use reth_db_api::models::ShardedKey;
|
||||
use reth_db_api::{models::ShardedKey, BlockNumberList};
|
||||
|
||||
let mut to_delete: Vec<ShardedKey<Address>> = Vec::new();
|
||||
let mut to_rewrite: Vec<(ShardedKey<Address>, BlockNumberList)> = Vec::new();
|
||||
|
||||
for result in self.iter::<tables::AccountsHistory>()? {
|
||||
let (key, _) = result?;
|
||||
let (key, value) = result?;
|
||||
let highest_block = key.highest_block_number;
|
||||
if max_block == 0 || (highest_block != u64::MAX && highest_block > max_block) {
|
||||
|
||||
if max_block == 0 {
|
||||
// Clear everything
|
||||
to_delete.push(key);
|
||||
} else if highest_block == u64::MAX {
|
||||
// Sentinel shard: filter out block numbers > max_block
|
||||
let filtered: Vec<u64> = value.iter().filter(|&bn| bn <= max_block).collect();
|
||||
if filtered.is_empty() {
|
||||
to_delete.push(key);
|
||||
} else if filtered.len() != value.len() as usize {
|
||||
// Some entries were filtered out, rewrite the shard
|
||||
to_rewrite.push((key, BlockNumberList::new_pre_sorted(filtered)));
|
||||
}
|
||||
} else if highest_block > max_block {
|
||||
// Non-sentinel shard above max_block: delete
|
||||
to_delete.push(key);
|
||||
}
|
||||
}
|
||||
|
||||
let deleted = to_delete.len();
|
||||
if deleted > 0 {
|
||||
let rewritten = to_rewrite.len();
|
||||
if deleted > 0 || rewritten > 0 {
|
||||
tracing::info!(
|
||||
target: "reth::providers::rocksdb",
|
||||
deleted_count = deleted,
|
||||
rewritten_count = rewritten,
|
||||
max_block,
|
||||
"Pruning AccountsHistory entries"
|
||||
);
|
||||
@@ -462,6 +527,9 @@ impl RocksDBProvider {
|
||||
for key in to_delete {
|
||||
batch.delete::<tables::AccountsHistory>(key)?;
|
||||
}
|
||||
for (key, value) in to_rewrite {
|
||||
batch.put::<tables::AccountsHistory>(key, &value)?;
|
||||
}
|
||||
batch.commit()?;
|
||||
}
|
||||
|
||||
@@ -996,6 +1064,7 @@ mod tests {
|
||||
// This simulates a scenario where history tracking started but no shards were completed
|
||||
let key_sentinel_1 = StorageShardedKey::new(Address::ZERO, B256::ZERO, u64::MAX);
|
||||
let key_sentinel_2 = StorageShardedKey::new(Address::random(), B256::random(), u64::MAX);
|
||||
// Sentinel shards contain blocks [10, 20, 30], so max block in shard = 30
|
||||
let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
|
||||
rocksdb.put::<tables::StoragesHistory>(key_sentinel_1, &block_list).unwrap();
|
||||
rocksdb.put::<tables::StoragesHistory>(key_sentinel_2, &block_list).unwrap();
|
||||
@@ -1009,24 +1078,21 @@ mod tests {
|
||||
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
|
||||
);
|
||||
|
||||
// Set a checkpoint indicating we should have processed up to block 100
|
||||
// Checkpoint = 30 matches sentinel shard max block, simulating "consistent" state
|
||||
// where indexing is complete through the checkpoint block
|
||||
{
|
||||
let provider = factory.database_provider_rw().unwrap();
|
||||
provider
|
||||
.save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
|
||||
.save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(30))
|
||||
.unwrap();
|
||||
provider.commit().unwrap();
|
||||
}
|
||||
|
||||
let provider = factory.database_provider_ro().unwrap();
|
||||
|
||||
// RocksDB has only sentinel entries (no completed shards) but checkpoint is set.
|
||||
// This is treated as a first-run/migration scenario - no unwind needed.
|
||||
// RocksDB has sentinel entries matching the checkpoint - consistent state
|
||||
let result = rocksdb.check_consistency(&provider).unwrap();
|
||||
assert_eq!(
|
||||
result, None,
|
||||
"Sentinel-only entries with checkpoint should be treated as first run"
|
||||
);
|
||||
assert_eq!(result, None, "Sentinel-only entries matching checkpoint should be consistent");
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -1042,6 +1108,7 @@ mod tests {
|
||||
// Insert ONLY sentinel entries (highest_block_number = u64::MAX)
|
||||
let key_sentinel_1 = ShardedKey::new(Address::ZERO, u64::MAX);
|
||||
let key_sentinel_2 = ShardedKey::new(Address::random(), u64::MAX);
|
||||
// Sentinel shards contain blocks [10, 20, 30], so max block in shard = 30
|
||||
let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
|
||||
rocksdb.put::<tables::AccountsHistory>(key_sentinel_1, &block_list).unwrap();
|
||||
rocksdb.put::<tables::AccountsHistory>(key_sentinel_2, &block_list).unwrap();
|
||||
@@ -1055,24 +1122,21 @@ mod tests {
|
||||
StorageSettings::legacy().with_account_history_in_rocksdb(true),
|
||||
);
|
||||
|
||||
// Set a checkpoint indicating we should have processed up to block 100
|
||||
// Checkpoint = 30 matches sentinel shard max block, simulating "consistent" state
|
||||
// where indexing is complete through the checkpoint block
|
||||
{
|
||||
let provider = factory.database_provider_rw().unwrap();
|
||||
provider
|
||||
.save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
|
||||
.save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(30))
|
||||
.unwrap();
|
||||
provider.commit().unwrap();
|
||||
}
|
||||
|
||||
let provider = factory.database_provider_ro().unwrap();
|
||||
|
||||
// RocksDB has only sentinel entries (no completed shards) but checkpoint is set.
|
||||
// This is treated as a first-run/migration scenario - no unwind needed.
|
||||
// RocksDB has sentinel entries matching the checkpoint - consistent state
|
||||
let result = rocksdb.check_consistency(&provider).unwrap();
|
||||
assert_eq!(
|
||||
result, None,
|
||||
"Sentinel-only entries with checkpoint should be treated as first run"
|
||||
);
|
||||
assert_eq!(result, None, "Sentinel-only entries matching checkpoint should be consistent");
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -4,7 +4,10 @@ mod invariants;
|
||||
mod metrics;
|
||||
mod provider;
|
||||
|
||||
pub(crate) use provider::{PendingRocksDBBatches, RocksDBWriteCtx};
|
||||
#[allow(unused_imports)]
|
||||
pub(crate) use provider::{
|
||||
PendingHistory, PendingHistoryWrites, PendingRocksDBBatches, RocksDBWriteCtx,
|
||||
};
|
||||
pub use provider::{
|
||||
RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksDBRawIter, RocksDBTableStats, RocksTx,
|
||||
};
|
||||
|
||||
@@ -15,7 +15,7 @@ use reth_db_api::{
|
||||
table::{Compress, Decode, Decompress, Encode, Table},
|
||||
tables, BlockNumberList, DatabaseError,
|
||||
};
|
||||
use reth_primitives_traits::BlockBody as _;
|
||||
use reth_primitives_traits::{AlloyBlockHeader as _, BlockBody as _};
|
||||
use reth_prune_types::PruneMode;
|
||||
use reth_storage_errors::{
|
||||
db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel},
|
||||
@@ -57,6 +57,24 @@ pub struct RocksDBTableStats {
|
||||
pub pending_compaction_bytes: u64,
|
||||
}
|
||||
|
||||
/// Pending history writes that are accumulated across multiple `save_blocks()` calls
|
||||
/// and materialized into `RocksDB` shards at commit time.
|
||||
///
|
||||
/// This exists because [`RocksDBBatch::append_account_history_shard`] and
|
||||
/// [`RocksDBBatch::append_storage_history_shard`] read from committed state, not pending writes.
|
||||
/// By accumulating all history deltas in memory and materializing once at commit time, we ensure
|
||||
/// each key is written exactly once with all its indices.
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct PendingHistoryWrites {
|
||||
/// Account history: address -> list of block numbers where the account was modified.
|
||||
pub accounts: BTreeMap<Address, Vec<u64>>,
|
||||
/// Storage history: (address, slot) -> list of block numbers where the storage was modified.
|
||||
pub storages: BTreeMap<(Address, B256), Vec<u64>>,
|
||||
}
|
||||
|
||||
/// Pending history writes type alias.
|
||||
pub(crate) type PendingHistory = Arc<Mutex<PendingHistoryWrites>>;
|
||||
|
||||
/// Context for `RocksDB` block writes.
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct RocksDBWriteCtx {
|
||||
@@ -68,6 +86,9 @@ pub(crate) struct RocksDBWriteCtx {
|
||||
pub storage_settings: StorageSettings,
|
||||
/// Pending batches to push to after writing.
|
||||
pub pending_batches: PendingRocksDBBatches,
|
||||
/// Pending history writes accumulated across `save_blocks()` calls.
|
||||
/// Materialized into `RocksDB` shards at commit time.
|
||||
pub pending_history: PendingHistory,
|
||||
}
|
||||
|
||||
impl fmt::Debug for RocksDBWriteCtx {
|
||||
@@ -77,6 +98,7 @@ impl fmt::Debug for RocksDBWriteCtx {
|
||||
.field("prune_tx_lookup", &self.prune_tx_lookup)
|
||||
.field("storage_settings", &self.storage_settings)
|
||||
.field("pending_batches", &"<pending batches>")
|
||||
.field("pending_history", &"<pending history>")
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
@@ -594,7 +616,7 @@ impl RocksDBProvider {
|
||||
let write_options = WriteOptions::default();
|
||||
let txn_options = OptimisticTransactionOptions::default();
|
||||
let inner = self.0.db_rw().transaction_opt(&write_options, &txn_options);
|
||||
RocksTx { inner, provider: self }
|
||||
RocksTx { inner, provider: self, assume_history_complete: false }
|
||||
}
|
||||
|
||||
/// Creates a new batch for atomic writes.
|
||||
@@ -1057,56 +1079,109 @@ impl RocksDBProvider {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Writes account history indices for the given blocks.
|
||||
/// Accumulates account history indices for the given blocks.
|
||||
///
|
||||
/// Indices are accumulated in `ctx.pending_history` and materialized at commit time
|
||||
/// by [`RocksDBProvider::commit_pending_history`]. This ensures each key is written
|
||||
/// exactly once, working around `RocksDB` batch reads not seeing uncommitted writes.
|
||||
///
|
||||
/// Only accounts with info changes (nonce, balance, `code_hash`) or lifecycle changes
|
||||
/// (created/destroyed) are included, matching MDBX `AccountChangeSets` behavior.
|
||||
#[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
|
||||
fn write_account_history<N: reth_node_types::NodePrimitives>(
|
||||
&self,
|
||||
blocks: &[ExecutedBlock<N>],
|
||||
ctx: &RocksDBWriteCtx,
|
||||
) -> ProviderResult<()> {
|
||||
let mut batch = self.batch();
|
||||
let mut account_history: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
|
||||
for (block_idx, block) in blocks.iter().enumerate() {
|
||||
let block_number = ctx.first_block_number + block_idx as u64;
|
||||
let mut local: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
|
||||
for block in blocks {
|
||||
let block_number = block.recovered_block().number();
|
||||
let bundle = &block.execution_outcome().state;
|
||||
for &address in bundle.state().keys() {
|
||||
account_history.entry(address).or_default().push(block_number);
|
||||
for (&address, account) in bundle.state() {
|
||||
let info_changed = account.is_info_changed();
|
||||
let was_destroyed = account.was_destroyed();
|
||||
|
||||
if info_changed || was_destroyed {
|
||||
local.entry(address).or_default().push(block_number);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Write account history using proper shard append logic
|
||||
for (address, indices) in account_history {
|
||||
batch.append_account_history_shard(address, indices)?;
|
||||
let mut pending = ctx.pending_history.lock();
|
||||
for (address, mut indices) in local {
|
||||
pending.accounts.entry(address).or_default().append(&mut indices);
|
||||
}
|
||||
ctx.pending_batches.lock().push(batch.into_inner());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Writes storage history indices for the given blocks.
|
||||
/// Accumulates storage history indices for the given blocks.
|
||||
///
|
||||
/// Indices are accumulated in `ctx.pending_history` and materialized at commit time
|
||||
/// by [`RocksDBProvider::commit_pending_history`]. This ensures each key is written
|
||||
/// exactly once, working around `RocksDB` batch reads not seeing uncommitted writes.
|
||||
///
|
||||
/// Only storage slots with actual value changes are included, matching MDBX behavior.
|
||||
#[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
|
||||
fn write_storage_history<N: reth_node_types::NodePrimitives>(
|
||||
&self,
|
||||
blocks: &[ExecutedBlock<N>],
|
||||
ctx: &RocksDBWriteCtx,
|
||||
) -> ProviderResult<()> {
|
||||
let mut batch = self.batch();
|
||||
let mut storage_history: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
|
||||
for (block_idx, block) in blocks.iter().enumerate() {
|
||||
let block_number = ctx.first_block_number + block_idx as u64;
|
||||
let mut local: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
|
||||
for block in blocks {
|
||||
let block_number = block.recovered_block().number();
|
||||
let bundle = &block.execution_outcome().state;
|
||||
for (&address, account) in bundle.state() {
|
||||
for &slot in account.storage.keys() {
|
||||
let key = B256::new(slot.to_be_bytes());
|
||||
storage_history.entry((address, key)).or_default().push(block_number);
|
||||
for (&slot, storage_slot) in &account.storage {
|
||||
if storage_slot.is_changed() {
|
||||
let key = B256::new(slot.to_be_bytes());
|
||||
local.entry((address, key)).or_default().push(block_number);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Write storage history using proper shard append logic
|
||||
for ((address, slot), indices) in storage_history {
|
||||
let mut pending = ctx.pending_history.lock();
|
||||
for ((address, slot), mut indices) in local {
|
||||
pending.storages.entry((address, slot)).or_default().append(&mut indices);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Materializes pending history writes into `RocksDB` shards and commits them.
|
||||
///
|
||||
/// This is called at provider commit time, AFTER MDBX has committed. Each key is
|
||||
/// written exactly once, ensuring [`RocksDBBatch::append_account_history_shard`] and
|
||||
/// [`RocksDBBatch::append_storage_history_shard`] see committed state.
|
||||
pub(crate) fn commit_pending_history(
|
||||
&self,
|
||||
history: PendingHistoryWrites,
|
||||
) -> ProviderResult<()> {
|
||||
if history.accounts.is_empty() && history.storages.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut batch = self.batch();
|
||||
|
||||
// Write account history shards
|
||||
for (address, mut indices) in history.accounts {
|
||||
// Sort and dedup in case of duplicate block numbers from multiple save_blocks calls
|
||||
indices.sort_unstable();
|
||||
indices.dedup();
|
||||
batch.append_account_history_shard(address, indices)?;
|
||||
}
|
||||
|
||||
// Write storage history shards
|
||||
for ((address, slot), mut indices) in history.storages {
|
||||
indices.sort_unstable();
|
||||
indices.dedup();
|
||||
batch.append_storage_history_shard(address, slot, indices)?;
|
||||
}
|
||||
ctx.pending_batches.lock().push(batch.into_inner());
|
||||
|
||||
if !batch.is_empty() {
|
||||
batch.commit()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -1521,10 +1596,14 @@ impl<'a> RocksDBBatch<'a> {
|
||||
/// - Iteration over uncommitted data
|
||||
///
|
||||
/// Note: `Transaction` is `Send` but NOT `Sync`. This wrapper does not implement
|
||||
/// `DbTx`/`DbTxMut` traits directly; use RocksDB-specific methods instead.
|
||||
/// `DbTx`/`DbTxMut` traits directly; use `RocksDB`-specific methods instead.
|
||||
pub struct RocksTx<'db> {
|
||||
inner: Transaction<'db, OptimisticTransactionDB>,
|
||||
provider: &'db RocksDBProvider,
|
||||
/// When true, assume `RocksDB` has complete history (like MDBX) and return `NotYetWritten`
|
||||
/// when querying before the first history entry. When false (default), return
|
||||
/// `MaybeInPlainState` for hybrid storage safety.
|
||||
assume_history_complete: bool,
|
||||
}
|
||||
|
||||
impl fmt::Debug for RocksTx<'_> {
|
||||
@@ -1534,6 +1613,20 @@ impl fmt::Debug for RocksTx<'_> {
|
||||
}
|
||||
|
||||
impl<'db> RocksTx<'db> {
|
||||
/// Sets the `assume_history_complete` flag to true.
|
||||
///
|
||||
/// When enabled, history queries will return `NotYetWritten` (like MDBX) instead of
|
||||
/// `MaybeInPlainState` when querying before the first history entry.
|
||||
///
|
||||
/// WARNING: Only use in tests where `RocksDB` has complete history (identical to MDBX).
|
||||
/// In production, `RocksDB` may have partial history, so this flag would cause incorrect
|
||||
/// behavior (treating accounts as non-existent when they exist in MDBX).
|
||||
#[cfg(test)]
|
||||
pub const fn with_assume_history_complete(mut self) -> Self {
|
||||
self.assume_history_complete = true;
|
||||
self
|
||||
}
|
||||
|
||||
/// Gets a value from the specified table. Sees uncommitted writes in this transaction.
|
||||
pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
|
||||
let encoded_key = key.encode();
|
||||
@@ -1700,13 +1793,19 @@ impl<'db> RocksTx<'db> {
|
||||
where
|
||||
T: Table<Value = BlockNumberList>,
|
||||
{
|
||||
// History may be pruned if a lowest available block is set.
|
||||
let is_maybe_pruned = lowest_available_block_number.is_some();
|
||||
// When RocksDB can't find history for an address, we must fall back to plain state.
|
||||
//
|
||||
// This is critical for hybrid storage: RocksDB only contains history indices for blocks
|
||||
// written AFTER it was enabled. Accounts modified before that exist only in MDBX.
|
||||
// Returning `NotYetWritten` would incorrectly treat them as non-existent (nonce 0).
|
||||
//
|
||||
// When `assume_history_complete` is true (for tests with identical data), return
|
||||
// `NotYetWritten` to match MDBX semantics.
|
||||
let fallback = || {
|
||||
Ok(if is_maybe_pruned {
|
||||
HistoryInfo::MaybeInPlainState
|
||||
} else {
|
||||
Ok(if self.assume_history_complete {
|
||||
HistoryInfo::NotYetWritten
|
||||
} else {
|
||||
HistoryInfo::MaybeInPlainState
|
||||
})
|
||||
};
|
||||
|
||||
@@ -1757,11 +1856,27 @@ impl<'db> RocksTx<'db> {
|
||||
false
|
||||
};
|
||||
|
||||
Ok(HistoryInfo::from_lookup(
|
||||
// For RocksDB, we must handle `is_before_first_write` specially.
|
||||
//
|
||||
// When RocksDB says the target block is before the first recorded write:
|
||||
// - MDBX would return `NotYetWritten` (account doesn't exist) because MDBX has full history
|
||||
// - RocksDB should return `MaybeInPlainState` because the account may exist in MDBX from
|
||||
// before RocksDB was enabled
|
||||
//
|
||||
// We use `from_lookup` but then override `NotYetWritten` -> `MaybeInPlainState`
|
||||
// unless `assume_history_complete` is true (for tests with identical data).
|
||||
let result = HistoryInfo::from_lookup(
|
||||
found_block,
|
||||
is_before_first_write,
|
||||
lowest_available_block_number,
|
||||
))
|
||||
);
|
||||
|
||||
Ok(match result {
|
||||
HistoryInfo::NotYetWritten if !self.assume_history_complete => {
|
||||
HistoryInfo::MaybeInPlainState
|
||||
}
|
||||
other => other,
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns an error if the raw iterator is in an invalid state due to an I/O error.
|
||||
@@ -2257,7 +2372,7 @@ mod tests {
|
||||
|
||||
/// Tests the edge case where block < `lowest_available_block_number`.
|
||||
/// This case cannot be tested via `HistoricalStateProviderRef` (which errors before lookup),
|
||||
/// so we keep this RocksDB-specific test to verify the low-level behavior.
|
||||
/// so we keep this `RocksDB`-specific test to verify the low-level behavior.
|
||||
#[test]
|
||||
fn test_account_history_info_pruned_before_first_entry() {
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
|
||||
@@ -32,6 +32,13 @@ pub struct RocksDBTableStats {
|
||||
pub pending_compaction_bytes: u64,
|
||||
}
|
||||
|
||||
/// Pending history writes (stub - empty struct).
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct PendingHistoryWrites;
|
||||
|
||||
/// Pending history writes type alias (stub).
|
||||
pub(crate) type PendingHistory = Arc<Mutex<PendingHistoryWrites>>;
|
||||
|
||||
/// Context for `RocksDB` block writes (stub).
|
||||
#[derive(Debug, Clone)]
|
||||
#[allow(dead_code)]
|
||||
@@ -44,6 +51,8 @@ pub(crate) struct RocksDBWriteCtx {
|
||||
pub storage_settings: StorageSettings,
|
||||
/// Pending batches (stub - unused).
|
||||
pub pending_batches: PendingRocksDBBatches,
|
||||
/// Pending history writes (stub - unused).
|
||||
pub pending_history: PendingHistory,
|
||||
}
|
||||
|
||||
/// A stub `RocksDB` provider.
|
||||
|
||||
Reference in New Issue
Block a user