Compare commits

...

2 Commits
t4 ... wt-pr1

Author SHA1 Message Date
yongkangc
08412060c5 fix: address review nits - use max() and add rewrite path tests 2026-01-22 12:26:50 +00:00
yongkangc
64dc89c96e fix(rocksdb): check actual block numbers inside sentinel shards
Previously, invariant checks only looked at the highest_block_number in
shard keys, treating sentinel shards (u64::MAX) as 'no history'. This was
incorrect because sentinel shards contain actual block numbers that must
be checked for consistency.

Changes:
- check_storages_history_consistency: now reads sentinel shard contents
  and uses max(highest_block, max_sentinel_block) for consistency checks
- check_accounts_history_consistency: same sentinel shard handling
- prune_storages_history_above: now filters sentinel shard contents
  rather than deleting entire shards, preserving valid history
- prune_accounts_history_above: same filtering for sentinel shards
- Updated tests to set checkpoints matching sentinel shard data
2026-01-22 12:08:53 +00:00

View File

@@ -253,14 +253,21 @@ impl RocksDBProvider {
return Ok(None);
}
// Find the max highest_block_number (excluding u64::MAX sentinel) across all
// entries. Also track if we found any non-sentinel entries.
// Find the max block number across all entries, including sentinel shards.
let mut max_highest_block = 0u64;
let mut max_sentinel_block = 0u64;
let mut found_non_sentinel = false;
for result in self.iter::<tables::StoragesHistory>()? {
let (key, _) = result?;
let (key, value) = result?;
let highest = key.sharded_key.highest_block_number;
if highest != u64::MAX {
if highest == u64::MAX {
if let Some(max_in_shard) = value.max() &&
max_in_shard > max_sentinel_block
{
max_sentinel_block = max_in_shard;
}
} else {
found_non_sentinel = true;
if highest > max_highest_block {
max_highest_block = highest;
@@ -268,31 +275,34 @@ impl RocksDBProvider {
}
}
// If all entries are sentinel entries (u64::MAX), treat as first-run scenario.
// This means no completed shards exist (only sentinel shards with
// highest_block_number=u64::MAX), so no actual history has been indexed.
if !found_non_sentinel {
let effective_max = max_highest_block.max(max_sentinel_block);
// If we only have sentinel shards and they're all empty, we're consistent
if !found_non_sentinel && max_sentinel_block == 0 {
return Ok(None);
}
// If any entry has highest_block > checkpoint, prune excess
if max_highest_block > checkpoint {
// If any entry has data > checkpoint, prune excess
if effective_max > checkpoint {
tracing::info!(
target: "reth::providers::rocksdb",
rocks_highest = max_highest_block,
rocks_highest = effective_max,
max_non_sentinel = max_highest_block,
max_sentinel = max_sentinel_block,
checkpoint,
"StoragesHistory ahead of checkpoint, pruning excess data"
);
self.prune_storages_history_above(checkpoint)?;
} else if max_highest_block < checkpoint {
return Ok(None);
} else if effective_max < checkpoint {
// RocksDB is behind checkpoint, return highest block to signal unwind needed
tracing::warn!(
target: "reth::providers::rocksdb",
rocks_highest = max_highest_block,
rocks_highest = effective_max,
checkpoint,
"StoragesHistory behind checkpoint, unwind needed"
);
return Ok(Some(max_highest_block));
return Ok(Some(effective_max));
}
Ok(None)
@@ -309,25 +319,42 @@ impl RocksDBProvider {
/// For `StoragesHistory`, the key contains `highest_block_number`, so we can iterate
/// and delete entries where `key.sharded_key.highest_block_number > max_block`.
///
/// Sentinel shards (with `highest_block_number = u64::MAX`) require special handling:
/// we must read their contents and filter out block numbers > `max_block`.
///
/// TODO(<https://github.com/paradigmxyz/reth/issues/20417>): this iterates the whole table,
/// which is inefficient. Use changeset-based pruning instead.
fn prune_storages_history_above(&self, max_block: BlockNumber) -> ProviderResult<()> {
use reth_db_api::models::storage_sharded_key::StorageShardedKey;
use reth_db_api::{models::storage_sharded_key::StorageShardedKey, BlockNumberList};
let mut to_delete: Vec<StorageShardedKey> = Vec::new();
let mut to_rewrite: Vec<(StorageShardedKey, BlockNumberList)> = Vec::new();
for result in self.iter::<tables::StoragesHistory>()? {
let (key, _) = result?;
let (key, value) = result?;
let highest_block = key.sharded_key.highest_block_number;
if max_block == 0 || (highest_block != u64::MAX && highest_block > max_block) {
if max_block == 0 {
to_delete.push(key);
} else if highest_block == u64::MAX {
let filtered: Vec<u64> = value.iter().filter(|&bn| bn <= max_block).collect();
if filtered.is_empty() {
to_delete.push(key);
} else if filtered.len() != value.len() as usize {
to_rewrite.push((key, BlockNumberList::new_pre_sorted(filtered)));
}
} else if highest_block > max_block {
to_delete.push(key);
}
}
let deleted = to_delete.len();
if deleted > 0 {
let rewritten = to_rewrite.len();
if deleted > 0 || rewritten > 0 {
tracing::info!(
target: "reth::providers::rocksdb",
deleted_count = deleted,
rewritten_count = rewritten,
max_block,
"Pruning StoragesHistory entries"
);
@@ -336,6 +363,9 @@ impl RocksDBProvider {
for key in to_delete {
batch.delete::<tables::StoragesHistory>(key)?;
}
for (key, value) in to_rewrite {
batch.put::<tables::StoragesHistory>(key, &value)?;
}
batch.commit()?;
}
@@ -374,14 +404,24 @@ impl RocksDBProvider {
return Ok(None);
}
// Find the max highest_block_number (excluding u64::MAX sentinel) across all
// entries. Also track if we found any non-sentinel entries.
// Find the max block number across all entries, including sentinel shards.
// For sentinel shards (highest_block_number == u64::MAX), we need to look
// at the actual block numbers stored in the shard.
let mut max_highest_block = 0u64;
let mut max_sentinel_block = 0u64;
let mut found_non_sentinel = false;
for result in self.iter::<tables::AccountsHistory>()? {
let (key, _) = result?;
let (key, value) = result?;
let highest = key.highest_block_number;
if highest != u64::MAX {
if highest == u64::MAX {
// Sentinel shard: check the actual max block number in the shard
if let Some(max_in_shard) = value.max() &&
max_in_shard > max_sentinel_block
{
max_sentinel_block = max_in_shard;
}
} else {
found_non_sentinel = true;
if highest > max_highest_block {
max_highest_block = highest;
@@ -389,18 +429,22 @@ impl RocksDBProvider {
}
}
// If all entries are sentinel entries (u64::MAX), treat as first-run scenario.
// This means no completed shards exist (only sentinel shards with
// highest_block_number=u64::MAX), so no actual history has been indexed.
if !found_non_sentinel {
// Use the higher of the two maxes
let effective_max = max_highest_block.max(max_sentinel_block);
// If we only have sentinel shards and they're all within checkpoint,
// we're consistent
if !found_non_sentinel && max_sentinel_block == 0 {
return Ok(None);
}
// If any entry has highest_block > checkpoint, prune excess
if max_highest_block > checkpoint {
// If any entry has data > checkpoint, prune excess
if effective_max > checkpoint {
tracing::info!(
target: "reth::providers::rocksdb",
rocks_highest = max_highest_block,
rocks_highest = effective_max,
max_non_sentinel = max_highest_block,
max_sentinel = max_sentinel_block,
checkpoint,
"AccountsHistory ahead of checkpoint, pruning excess data"
);
@@ -409,14 +453,14 @@ impl RocksDBProvider {
}
// If RocksDB is behind the checkpoint, request an unwind to rebuild.
if max_highest_block < checkpoint {
if effective_max < checkpoint {
tracing::warn!(
target: "reth::providers::rocksdb",
rocks_highest = max_highest_block,
rocks_highest = effective_max,
checkpoint,
"AccountsHistory behind checkpoint, unwind needed"
);
return Ok(Some(max_highest_block));
return Ok(Some(effective_max));
}
Ok(None)
@@ -434,26 +478,47 @@ impl RocksDBProvider {
/// `highest_block_number`, so we can iterate and delete entries where
/// `key.highest_block_number > max_block`.
///
/// Sentinel shards (with `highest_block_number = u64::MAX`) require special handling:
/// we must read their contents and filter out block numbers > `max_block`.
///
/// TODO(<https://github.com/paradigmxyz/reth/issues/20417>): this iterates the whole table,
/// which is inefficient. Use changeset-based pruning instead.
fn prune_accounts_history_above(&self, max_block: BlockNumber) -> ProviderResult<()> {
use alloy_primitives::Address;
use reth_db_api::models::ShardedKey;
use reth_db_api::{models::ShardedKey, BlockNumberList};
let mut to_delete: Vec<ShardedKey<Address>> = Vec::new();
let mut to_rewrite: Vec<(ShardedKey<Address>, BlockNumberList)> = Vec::new();
for result in self.iter::<tables::AccountsHistory>()? {
let (key, _) = result?;
let (key, value) = result?;
let highest_block = key.highest_block_number;
if max_block == 0 || (highest_block != u64::MAX && highest_block > max_block) {
if max_block == 0 {
// Clear everything
to_delete.push(key);
} else if highest_block == u64::MAX {
// Sentinel shard: filter out block numbers > max_block
let filtered: Vec<u64> = value.iter().filter(|&bn| bn <= max_block).collect();
if filtered.is_empty() {
to_delete.push(key);
} else if filtered.len() != value.len() as usize {
// Some entries were filtered out, rewrite the shard
to_rewrite.push((key, BlockNumberList::new_pre_sorted(filtered)));
}
} else if highest_block > max_block {
// Non-sentinel shard above max_block: delete
to_delete.push(key);
}
}
let deleted = to_delete.len();
if deleted > 0 {
let rewritten = to_rewrite.len();
if deleted > 0 || rewritten > 0 {
tracing::info!(
target: "reth::providers::rocksdb",
deleted_count = deleted,
rewritten_count = rewritten,
max_block,
"Pruning AccountsHistory entries"
);
@@ -462,6 +527,9 @@ impl RocksDBProvider {
for key in to_delete {
batch.delete::<tables::AccountsHistory>(key)?;
}
for (key, value) in to_rewrite {
batch.put::<tables::AccountsHistory>(key, &value)?;
}
batch.commit()?;
}
@@ -996,6 +1064,7 @@ mod tests {
// This simulates a scenario where history tracking started but no shards were completed
let key_sentinel_1 = StorageShardedKey::new(Address::ZERO, B256::ZERO, u64::MAX);
let key_sentinel_2 = StorageShardedKey::new(Address::random(), B256::random(), u64::MAX);
// Use a checkpoint that matches the sentinel shard contents (max block = 30)
let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
rocksdb.put::<tables::StoragesHistory>(key_sentinel_1, &block_list).unwrap();
rocksdb.put::<tables::StoragesHistory>(key_sentinel_2, &block_list).unwrap();
@@ -1009,24 +1078,21 @@ mod tests {
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
// Set a checkpoint indicating we should have processed up to block 100
// Set a checkpoint that matches the sentinel shard data (max block = 30)
// This is a normal scenario where sentinel shards exist and are in sync with checkpoint
{
let provider = factory.database_provider_rw().unwrap();
provider
.save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
.save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(30))
.unwrap();
provider.commit().unwrap();
}
let provider = factory.database_provider_ro().unwrap();
// RocksDB has only sentinel entries (no completed shards) but checkpoint is set.
// This is treated as a first-run/migration scenario - no unwind needed.
// RocksDB has sentinel entries matching the checkpoint - consistent state
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(
result, None,
"Sentinel-only entries with checkpoint should be treated as first run"
);
assert_eq!(result, None, "Sentinel-only entries matching checkpoint should be consistent");
}
#[test]
@@ -1042,6 +1108,7 @@ mod tests {
// Insert ONLY sentinel entries (highest_block_number = u64::MAX)
let key_sentinel_1 = ShardedKey::new(Address::ZERO, u64::MAX);
let key_sentinel_2 = ShardedKey::new(Address::random(), u64::MAX);
// Use a checkpoint that matches the sentinel shard contents (max block = 30)
let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
rocksdb.put::<tables::AccountsHistory>(key_sentinel_1, &block_list).unwrap();
rocksdb.put::<tables::AccountsHistory>(key_sentinel_2, &block_list).unwrap();
@@ -1055,24 +1122,21 @@ mod tests {
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
// Set a checkpoint indicating we should have processed up to block 100
// Set a checkpoint that matches the sentinel shard data (max block = 30)
// This is a normal scenario where sentinel shards exist and are in sync with checkpoint
{
let provider = factory.database_provider_rw().unwrap();
provider
.save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
.save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(30))
.unwrap();
provider.commit().unwrap();
}
let provider = factory.database_provider_ro().unwrap();
// RocksDB has only sentinel entries (no completed shards) but checkpoint is set.
// This is treated as a first-run/migration scenario - no unwind needed.
// RocksDB has sentinel entries matching the checkpoint - consistent state
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(
result, None,
"Sentinel-only entries with checkpoint should be treated as first run"
);
assert_eq!(result, None, "Sentinel-only entries matching checkpoint should be consistent");
}
#[test]
@@ -1402,4 +1466,101 @@ mod tests {
"Should require unwind to block 50 to rebuild AccountsHistory"
);
}
#[test]
fn test_prune_storages_history_sentinel_shard_rewrite() {
use reth_db_api::models::storage_sharded_key::StorageShardedKey;
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::StoragesHistory>()
.build()
.unwrap();
let address = Address::random();
let storage_key = B256::random();
// Create a sentinel shard with blocks both above and below checkpoint 100
// Contains [10, 50, 150] - after pruning at checkpoint 100, should become [10, 50]
let sentinel_key = StorageShardedKey::new(address, storage_key, u64::MAX);
let block_list = BlockNumberList::new_pre_sorted([10, 50, 150]);
rocksdb.put::<tables::StoragesHistory>(sentinel_key.clone(), &block_list).unwrap();
// Create a test provider factory for MDBX
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
// Set checkpoint to block 100
{
let provider = factory.database_provider_rw().unwrap();
provider
.save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
.unwrap();
provider.commit().unwrap();
}
let provider = factory.database_provider_ro().unwrap();
// Run consistency check - should prune the sentinel shard
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(result, None, "Should heal by pruning, no unwind needed");
// Verify the sentinel shard was rewritten (not deleted) with only [10, 50]
let rewritten = rocksdb.get::<tables::StoragesHistory>(sentinel_key).unwrap();
assert!(rewritten.is_some(), "Sentinel shard should be rewritten, not deleted");
let rewritten_list = rewritten.unwrap();
let blocks: Vec<u64> = rewritten_list.iter().collect();
assert_eq!(blocks, vec![10, 50], "Sentinel shard should contain only blocks <= 100");
}
#[test]
fn test_prune_accounts_history_sentinel_shard_rewrite() {
use reth_db_api::models::ShardedKey;
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::AccountsHistory>()
.build()
.unwrap();
let address = Address::random();
// Create a sentinel shard with blocks both above and below checkpoint 100
// Contains [10, 50, 150] - after pruning at checkpoint 100, should become [10, 50]
let sentinel_key = ShardedKey::new(address, u64::MAX);
let block_list = BlockNumberList::new_pre_sorted([10, 50, 150]);
rocksdb.put::<tables::AccountsHistory>(sentinel_key.clone(), &block_list).unwrap();
// Create a test provider factory for MDBX
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
// Set checkpoint to block 100
{
let provider = factory.database_provider_rw().unwrap();
provider
.save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
.unwrap();
provider.commit().unwrap();
}
let provider = factory.database_provider_ro().unwrap();
// Run consistency check - should prune the sentinel shard
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(result, None, "Should heal by pruning, no unwind needed");
// Verify the sentinel shard was rewritten (not deleted) with only [10, 50]
let rewritten = rocksdb.get::<tables::AccountsHistory>(sentinel_key).unwrap();
assert!(rewritten.is_some(), "Sentinel shard should be rewritten, not deleted");
let rewritten_list = rewritten.unwrap();
let blocks: Vec<u64> = rewritten_list.iter().collect();
assert_eq!(blocks, vec![10, 50], "Sentinel shard should contain only blocks <= 100");
}
}