Compare commits

..

2 Commits

Author SHA1 Message Date
yongkangc
08412060c5 fix: address review nits - use max() and add rewrite path tests 2026-01-22 12:26:50 +00:00
yongkangc
64dc89c96e fix(rocksdb): check actual block numbers inside sentinel shards
Previously, invariant checks only looked at the highest_block_number in
shard keys, treating sentinel shards (u64::MAX) as 'no history'. This was
incorrect because sentinel shards contain actual block numbers that must
be checked for consistency.

Changes:
- check_storages_history_consistency: now reads sentinel shard contents
  and uses max(highest_block, max_sentinel_block) for consistency checks
- check_accounts_history_consistency: same sentinel shard handling
- prune_storages_history_above: now filters sentinel shard contents
  rather than deleting entire shards, preserving valid history
- prune_accounts_history_above: same filtering for sentinel shards
- Updated tests to set checkpoints matching sentinel shard data
2026-01-22 12:08:53 +00:00
3 changed files with 221 additions and 93 deletions

View File

@@ -1351,8 +1351,7 @@ mod rocksdb_tests {
// Run queries against both backends using EitherReader
let mdbx_ro = factory.database_provider_ro().unwrap();
// Use `with_assume_history_complete()` since both backends have identical data
let rocks_tx = rocks_provider.tx().with_assume_history_complete();
let rocks_tx = rocks_provider.tx();
for (i, query) in queries.iter().enumerate() {
// MDBX query via EitherReader
@@ -1444,8 +1443,7 @@ mod rocksdb_tests {
// Run queries against both backends using EitherReader
let mdbx_ro = factory.database_provider_ro().unwrap();
// Use `with_assume_history_complete()` since both backends have identical data
let rocks_tx = rocks_provider.tx().with_assume_history_complete();
let rocks_tx = rocks_provider.tx();
for (i, query) in queries.iter().enumerate() {
// MDBX query via EitherReader

View File

@@ -253,14 +253,21 @@ impl RocksDBProvider {
return Ok(None);
}
// Find the max highest_block_number (excluding u64::MAX sentinel) across all
// entries. Also track if we found any non-sentinel entries.
// Find the max block number across all entries, including sentinel shards.
let mut max_highest_block = 0u64;
let mut max_sentinel_block = 0u64;
let mut found_non_sentinel = false;
for result in self.iter::<tables::StoragesHistory>()? {
let (key, _) = result?;
let (key, value) = result?;
let highest = key.sharded_key.highest_block_number;
if highest != u64::MAX {
if highest == u64::MAX {
if let Some(max_in_shard) = value.max() &&
max_in_shard > max_sentinel_block
{
max_sentinel_block = max_in_shard;
}
} else {
found_non_sentinel = true;
if highest > max_highest_block {
max_highest_block = highest;
@@ -268,31 +275,34 @@ impl RocksDBProvider {
}
}
// If all entries are sentinel entries (u64::MAX), treat as first-run scenario.
// This means no completed shards exist (only sentinel shards with
// highest_block_number=u64::MAX), so no actual history has been indexed.
if !found_non_sentinel {
let effective_max = max_highest_block.max(max_sentinel_block);
// If we only have sentinel shards and they're all empty, we're consistent
if !found_non_sentinel && max_sentinel_block == 0 {
return Ok(None);
}
// If any entry has highest_block > checkpoint, prune excess
if max_highest_block > checkpoint {
// If any entry has data > checkpoint, prune excess
if effective_max > checkpoint {
tracing::info!(
target: "reth::providers::rocksdb",
rocks_highest = max_highest_block,
rocks_highest = effective_max,
max_non_sentinel = max_highest_block,
max_sentinel = max_sentinel_block,
checkpoint,
"StoragesHistory ahead of checkpoint, pruning excess data"
);
self.prune_storages_history_above(checkpoint)?;
} else if max_highest_block < checkpoint {
return Ok(None);
} else if effective_max < checkpoint {
// RocksDB is behind checkpoint, return highest block to signal unwind needed
tracing::warn!(
target: "reth::providers::rocksdb",
rocks_highest = max_highest_block,
rocks_highest = effective_max,
checkpoint,
"StoragesHistory behind checkpoint, unwind needed"
);
return Ok(Some(max_highest_block));
return Ok(Some(effective_max));
}
Ok(None)
@@ -309,25 +319,42 @@ impl RocksDBProvider {
/// For `StoragesHistory`, the key contains `highest_block_number`, so we can iterate
/// and delete entries where `key.sharded_key.highest_block_number > max_block`.
///
/// Sentinel shards (with `highest_block_number = u64::MAX`) require special handling:
/// we must read their contents and filter out block numbers > `max_block`.
///
/// TODO(<https://github.com/paradigmxyz/reth/issues/20417>): this iterates the whole table,
/// which is inefficient. Use changeset-based pruning instead.
fn prune_storages_history_above(&self, max_block: BlockNumber) -> ProviderResult<()> {
use reth_db_api::models::storage_sharded_key::StorageShardedKey;
use reth_db_api::{models::storage_sharded_key::StorageShardedKey, BlockNumberList};
let mut to_delete: Vec<StorageShardedKey> = Vec::new();
let mut to_rewrite: Vec<(StorageShardedKey, BlockNumberList)> = Vec::new();
for result in self.iter::<tables::StoragesHistory>()? {
let (key, _) = result?;
let (key, value) = result?;
let highest_block = key.sharded_key.highest_block_number;
if max_block == 0 || (highest_block != u64::MAX && highest_block > max_block) {
if max_block == 0 {
to_delete.push(key);
} else if highest_block == u64::MAX {
let filtered: Vec<u64> = value.iter().filter(|&bn| bn <= max_block).collect();
if filtered.is_empty() {
to_delete.push(key);
} else if filtered.len() != value.len() as usize {
to_rewrite.push((key, BlockNumberList::new_pre_sorted(filtered)));
}
} else if highest_block > max_block {
to_delete.push(key);
}
}
let deleted = to_delete.len();
if deleted > 0 {
let rewritten = to_rewrite.len();
if deleted > 0 || rewritten > 0 {
tracing::info!(
target: "reth::providers::rocksdb",
deleted_count = deleted,
rewritten_count = rewritten,
max_block,
"Pruning StoragesHistory entries"
);
@@ -336,6 +363,9 @@ impl RocksDBProvider {
for key in to_delete {
batch.delete::<tables::StoragesHistory>(key)?;
}
for (key, value) in to_rewrite {
batch.put::<tables::StoragesHistory>(key, &value)?;
}
batch.commit()?;
}
@@ -374,14 +404,24 @@ impl RocksDBProvider {
return Ok(None);
}
// Find the max highest_block_number (excluding u64::MAX sentinel) across all
// entries. Also track if we found any non-sentinel entries.
// Find the max block number across all entries, including sentinel shards.
// For sentinel shards (highest_block_number == u64::MAX), we need to look
// at the actual block numbers stored in the shard.
let mut max_highest_block = 0u64;
let mut max_sentinel_block = 0u64;
let mut found_non_sentinel = false;
for result in self.iter::<tables::AccountsHistory>()? {
let (key, _) = result?;
let (key, value) = result?;
let highest = key.highest_block_number;
if highest != u64::MAX {
if highest == u64::MAX {
// Sentinel shard: check the actual max block number in the shard
if let Some(max_in_shard) = value.max() &&
max_in_shard > max_sentinel_block
{
max_sentinel_block = max_in_shard;
}
} else {
found_non_sentinel = true;
if highest > max_highest_block {
max_highest_block = highest;
@@ -389,18 +429,22 @@ impl RocksDBProvider {
}
}
// If all entries are sentinel entries (u64::MAX), treat as first-run scenario.
// This means no completed shards exist (only sentinel shards with
// highest_block_number=u64::MAX), so no actual history has been indexed.
if !found_non_sentinel {
// Use the higher of the two maxes
let effective_max = max_highest_block.max(max_sentinel_block);
// If we only have sentinel shards and they're all within checkpoint,
// we're consistent
if !found_non_sentinel && max_sentinel_block == 0 {
return Ok(None);
}
// If any entry has highest_block > checkpoint, prune excess
if max_highest_block > checkpoint {
// If any entry has data > checkpoint, prune excess
if effective_max > checkpoint {
tracing::info!(
target: "reth::providers::rocksdb",
rocks_highest = max_highest_block,
rocks_highest = effective_max,
max_non_sentinel = max_highest_block,
max_sentinel = max_sentinel_block,
checkpoint,
"AccountsHistory ahead of checkpoint, pruning excess data"
);
@@ -409,14 +453,14 @@ impl RocksDBProvider {
}
// If RocksDB is behind the checkpoint, request an unwind to rebuild.
if max_highest_block < checkpoint {
if effective_max < checkpoint {
tracing::warn!(
target: "reth::providers::rocksdb",
rocks_highest = max_highest_block,
rocks_highest = effective_max,
checkpoint,
"AccountsHistory behind checkpoint, unwind needed"
);
return Ok(Some(max_highest_block));
return Ok(Some(effective_max));
}
Ok(None)
@@ -434,26 +478,47 @@ impl RocksDBProvider {
/// `highest_block_number`, so we can iterate and delete entries where
/// `key.highest_block_number > max_block`.
///
/// Sentinel shards (with `highest_block_number = u64::MAX`) require special handling:
/// we must read their contents and filter out block numbers > `max_block`.
///
/// TODO(<https://github.com/paradigmxyz/reth/issues/20417>): this iterates the whole table,
/// which is inefficient. Use changeset-based pruning instead.
fn prune_accounts_history_above(&self, max_block: BlockNumber) -> ProviderResult<()> {
use alloy_primitives::Address;
use reth_db_api::models::ShardedKey;
use reth_db_api::{models::ShardedKey, BlockNumberList};
let mut to_delete: Vec<ShardedKey<Address>> = Vec::new();
let mut to_rewrite: Vec<(ShardedKey<Address>, BlockNumberList)> = Vec::new();
for result in self.iter::<tables::AccountsHistory>()? {
let (key, _) = result?;
let (key, value) = result?;
let highest_block = key.highest_block_number;
if max_block == 0 || (highest_block != u64::MAX && highest_block > max_block) {
if max_block == 0 {
// Clear everything
to_delete.push(key);
} else if highest_block == u64::MAX {
// Sentinel shard: filter out block numbers > max_block
let filtered: Vec<u64> = value.iter().filter(|&bn| bn <= max_block).collect();
if filtered.is_empty() {
to_delete.push(key);
} else if filtered.len() != value.len() as usize {
// Some entries were filtered out, rewrite the shard
to_rewrite.push((key, BlockNumberList::new_pre_sorted(filtered)));
}
} else if highest_block > max_block {
// Non-sentinel shard above max_block: delete
to_delete.push(key);
}
}
let deleted = to_delete.len();
if deleted > 0 {
let rewritten = to_rewrite.len();
if deleted > 0 || rewritten > 0 {
tracing::info!(
target: "reth::providers::rocksdb",
deleted_count = deleted,
rewritten_count = rewritten,
max_block,
"Pruning AccountsHistory entries"
);
@@ -462,6 +527,9 @@ impl RocksDBProvider {
for key in to_delete {
batch.delete::<tables::AccountsHistory>(key)?;
}
for (key, value) in to_rewrite {
batch.put::<tables::AccountsHistory>(key, &value)?;
}
batch.commit()?;
}
@@ -996,6 +1064,7 @@ mod tests {
// This simulates a scenario where history tracking started but no shards were completed
let key_sentinel_1 = StorageShardedKey::new(Address::ZERO, B256::ZERO, u64::MAX);
let key_sentinel_2 = StorageShardedKey::new(Address::random(), B256::random(), u64::MAX);
// Use a checkpoint that matches the sentinel shard contents (max block = 30)
let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
rocksdb.put::<tables::StoragesHistory>(key_sentinel_1, &block_list).unwrap();
rocksdb.put::<tables::StoragesHistory>(key_sentinel_2, &block_list).unwrap();
@@ -1009,24 +1078,21 @@ mod tests {
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
// Set a checkpoint indicating we should have processed up to block 100
// Set a checkpoint that matches the sentinel shard data (max block = 30)
// This is a normal scenario where sentinel shards exist and are in sync with checkpoint
{
let provider = factory.database_provider_rw().unwrap();
provider
.save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
.save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(30))
.unwrap();
provider.commit().unwrap();
}
let provider = factory.database_provider_ro().unwrap();
// RocksDB has only sentinel entries (no completed shards) but checkpoint is set.
// This is treated as a first-run/migration scenario - no unwind needed.
// RocksDB has sentinel entries matching the checkpoint - consistent state
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(
result, None,
"Sentinel-only entries with checkpoint should be treated as first run"
);
assert_eq!(result, None, "Sentinel-only entries matching checkpoint should be consistent");
}
#[test]
@@ -1042,6 +1108,7 @@ mod tests {
// Insert ONLY sentinel entries (highest_block_number = u64::MAX)
let key_sentinel_1 = ShardedKey::new(Address::ZERO, u64::MAX);
let key_sentinel_2 = ShardedKey::new(Address::random(), u64::MAX);
// Use a checkpoint that matches the sentinel shard contents (max block = 30)
let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
rocksdb.put::<tables::AccountsHistory>(key_sentinel_1, &block_list).unwrap();
rocksdb.put::<tables::AccountsHistory>(key_sentinel_2, &block_list).unwrap();
@@ -1055,24 +1122,21 @@ mod tests {
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
// Set a checkpoint indicating we should have processed up to block 100
// Set a checkpoint that matches the sentinel shard data (max block = 30)
// This is a normal scenario where sentinel shards exist and are in sync with checkpoint
{
let provider = factory.database_provider_rw().unwrap();
provider
.save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
.save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(30))
.unwrap();
provider.commit().unwrap();
}
let provider = factory.database_provider_ro().unwrap();
// RocksDB has only sentinel entries (no completed shards) but checkpoint is set.
// This is treated as a first-run/migration scenario - no unwind needed.
// RocksDB has sentinel entries matching the checkpoint - consistent state
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(
result, None,
"Sentinel-only entries with checkpoint should be treated as first run"
);
assert_eq!(result, None, "Sentinel-only entries matching checkpoint should be consistent");
}
#[test]
@@ -1402,4 +1466,101 @@ mod tests {
"Should require unwind to block 50 to rebuild AccountsHistory"
);
}
#[test]
fn test_prune_storages_history_sentinel_shard_rewrite() {
use reth_db_api::models::storage_sharded_key::StorageShardedKey;
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::StoragesHistory>()
.build()
.unwrap();
let address = Address::random();
let storage_key = B256::random();
// Create a sentinel shard with blocks both above and below checkpoint 100
// Contains [10, 50, 150] - after pruning at checkpoint 100, should become [10, 50]
let sentinel_key = StorageShardedKey::new(address, storage_key, u64::MAX);
let block_list = BlockNumberList::new_pre_sorted([10, 50, 150]);
rocksdb.put::<tables::StoragesHistory>(sentinel_key.clone(), &block_list).unwrap();
// Create a test provider factory for MDBX
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
// Set checkpoint to block 100
{
let provider = factory.database_provider_rw().unwrap();
provider
.save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
.unwrap();
provider.commit().unwrap();
}
let provider = factory.database_provider_ro().unwrap();
// Run consistency check - should prune the sentinel shard
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(result, None, "Should heal by pruning, no unwind needed");
// Verify the sentinel shard was rewritten (not deleted) with only [10, 50]
let rewritten = rocksdb.get::<tables::StoragesHistory>(sentinel_key).unwrap();
assert!(rewritten.is_some(), "Sentinel shard should be rewritten, not deleted");
let rewritten_list = rewritten.unwrap();
let blocks: Vec<u64> = rewritten_list.iter().collect();
assert_eq!(blocks, vec![10, 50], "Sentinel shard should contain only blocks <= 100");
}
#[test]
fn test_prune_accounts_history_sentinel_shard_rewrite() {
use reth_db_api::models::ShardedKey;
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::AccountsHistory>()
.build()
.unwrap();
let address = Address::random();
// Create a sentinel shard with blocks both above and below checkpoint 100
// Contains [10, 50, 150] - after pruning at checkpoint 100, should become [10, 50]
let sentinel_key = ShardedKey::new(address, u64::MAX);
let block_list = BlockNumberList::new_pre_sorted([10, 50, 150]);
rocksdb.put::<tables::AccountsHistory>(sentinel_key.clone(), &block_list).unwrap();
// Create a test provider factory for MDBX
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
// Set checkpoint to block 100
{
let provider = factory.database_provider_rw().unwrap();
provider
.save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
.unwrap();
provider.commit().unwrap();
}
let provider = factory.database_provider_ro().unwrap();
// Run consistency check - should prune the sentinel shard
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(result, None, "Should heal by pruning, no unwind needed");
// Verify the sentinel shard was rewritten (not deleted) with only [10, 50]
let rewritten = rocksdb.get::<tables::AccountsHistory>(sentinel_key).unwrap();
assert!(rewritten.is_some(), "Sentinel shard should be rewritten, not deleted");
let rewritten_list = rewritten.unwrap();
let blocks: Vec<u64> = rewritten_list.iter().collect();
assert_eq!(blocks, vec![10, 50], "Sentinel shard should contain only blocks <= 100");
}
}

View File

@@ -594,7 +594,7 @@ impl RocksDBProvider {
let write_options = WriteOptions::default();
let txn_options = OptimisticTransactionOptions::default();
let inner = self.0.db_rw().transaction_opt(&write_options, &txn_options);
RocksTx { inner, provider: self, assume_history_complete: false }
RocksTx { inner, provider: self }
}
/// Creates a new batch for atomic writes.
@@ -1525,10 +1525,6 @@ impl<'a> RocksDBBatch<'a> {
pub struct RocksTx<'db> {
inner: Transaction<'db, OptimisticTransactionDB>,
provider: &'db RocksDBProvider,
/// When true, assume `RocksDB` has complete history (like `MDBX`) and return `NotYetWritten`
/// when querying before the first history entry. When false (default), return
/// `MaybeInPlainState` for hybrid storage safety.
assume_history_complete: bool,
}
impl fmt::Debug for RocksTx<'_> {
@@ -1538,16 +1534,6 @@ impl fmt::Debug for RocksTx<'_> {
}
impl<'db> RocksTx<'db> {
/// Sets the `assume_history_complete` flag to true.
///
/// When enabled, history queries will return `NotYetWritten` (like `MDBX`) instead of
/// `MaybeInPlainState` when querying before the first history entry. Use this in tests
/// where `RocksDB` and `MDBX` have identical data.
pub const fn with_assume_history_complete(mut self) -> Self {
self.assume_history_complete = true;
self
}
/// Gets a value from the specified table. Sees uncommitted writes in this transaction.
pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
let encoded_key = key.encode();
@@ -1714,19 +1700,10 @@ impl<'db> RocksTx<'db> {
where
T: Table<Value = BlockNumberList>,
{
// Determines whether to soften NotYetWritten -> MaybeInPlainState.
//
// We soften when:
// 1. `assume_history_complete` is false (hybrid storage - RocksDB may not have full
// history)
// 2. OR history may be pruned (`lowest_available_block_number.is_some()`)
//
// We only return NotYetWritten when we're certain history is complete AND not pruned.
let should_soften_not_yet_written =
!self.assume_history_complete || lowest_available_block_number.is_some();
// History may be pruned if a lowest available block is set.
let is_maybe_pruned = lowest_available_block_number.is_some();
let fallback = || {
Ok(if should_soften_not_yet_written {
Ok(if is_maybe_pruned {
HistoryInfo::MaybeInPlainState
} else {
HistoryInfo::NotYetWritten
@@ -1780,19 +1757,11 @@ impl<'db> RocksTx<'db> {
false
};
// Apply the same softening logic to `from_lookup` result.
let result = HistoryInfo::from_lookup(
Ok(HistoryInfo::from_lookup(
found_block,
is_before_first_write,
lowest_available_block_number,
);
Ok(match result {
HistoryInfo::NotYetWritten if should_soften_not_yet_written => {
HistoryInfo::MaybeInPlainState
}
other => other,
})
))
}
/// Returns an error if the raw iterator is in an invalid state due to an I/O error.