feat(storage): add AccountsHistory RocksDB consistency check (#20594)

This commit is contained in:
YK
2025-12-28 09:59:02 +08:00
committed by GitHub
parent e595b58c28
commit a92cbb5e8b

View File

@@ -69,6 +69,13 @@ impl RocksDBProvider {
unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
}
// Check AccountsHistory if stored in RocksDB
if provider.cached_storage_settings().account_history_in_rocksdb &&
let Some(target) = self.check_accounts_history(provider)?
{
unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
}
Ok(unwind_target)
}
@@ -275,6 +282,18 @@ impl RocksDBProvider {
"StoragesHistory ahead of checkpoint, pruning excess data"
);
self.prune_storages_history_above(checkpoint)?;
return Ok(None);
}
// If RocksDB is behind the checkpoint, request an unwind to rebuild.
if max_highest_block < checkpoint {
tracing::warn!(
target: "reth::providers::rocksdb",
rocks_highest = max_highest_block,
checkpoint,
"StoragesHistory behind checkpoint, unwind needed"
);
return Ok(Some(max_highest_block));
}
Ok(None)
@@ -327,6 +346,125 @@ impl RocksDBProvider {
Ok(())
}
/// Checks invariants for the `AccountsHistory` table.
///
/// Returns a block number to unwind to if `RocksDB` is behind the checkpoint.
/// If `RocksDB` is ahead of the checkpoint, excess entries are pruned (healed).
fn check_accounts_history<Provider>(
&self,
provider: &Provider,
) -> ProviderResult<Option<BlockNumber>>
where
Provider: DBProvider + StageCheckpointReader,
{
// Get the IndexAccountHistory stage checkpoint
let checkpoint = provider
.get_stage_checkpoint(StageId::IndexAccountHistory)?
.map(|cp| cp.block_number)
.unwrap_or(0);
// Check if RocksDB has any data
let rocks_first = self.first::<tables::AccountsHistory>()?;
match rocks_first {
Some(_) => {
// If checkpoint is 0 but we have data, clear everything
if checkpoint == 0 {
tracing::info!(
target: "reth::providers::rocksdb",
"AccountsHistory has data but checkpoint is 0, clearing all"
);
self.prune_accounts_history_above(0)?;
return Ok(None);
}
// Find the max highest_block_number (excluding u64::MAX sentinel) across all
// entries
let mut max_highest_block = 0u64;
for result in self.iter::<tables::AccountsHistory>()? {
let (key, _) = result?;
let highest = key.highest_block_number;
if highest != u64::MAX && highest > max_highest_block {
max_highest_block = highest;
}
}
// If any entry has highest_block > checkpoint, prune excess
if max_highest_block > checkpoint {
tracing::info!(
target: "reth::providers::rocksdb",
rocks_highest = max_highest_block,
checkpoint,
"AccountsHistory ahead of checkpoint, pruning excess data"
);
self.prune_accounts_history_above(checkpoint)?;
return Ok(None);
}
// If RocksDB is behind the checkpoint, request an unwind to rebuild.
if max_highest_block < checkpoint {
tracing::warn!(
target: "reth::providers::rocksdb",
rocks_highest = max_highest_block,
checkpoint,
"AccountsHistory behind checkpoint, unwind needed"
);
return Ok(Some(max_highest_block));
}
Ok(None)
}
None => {
// Empty RocksDB table
if checkpoint > 0 {
// Stage says we should have data but we don't
return Ok(Some(0));
}
Ok(None)
}
}
}
/// Prunes `AccountsHistory` entries where `highest_block_number` > `max_block`.
///
/// For `AccountsHistory`, the key is `ShardedKey<Address>` which contains
/// `highest_block_number`, so we can iterate and delete entries where
/// `key.highest_block_number > max_block`.
///
/// TODO(<https://github.com/paradigmxyz/reth/issues/20417>): this iterates the whole table,
/// which is inefficient. Use changeset-based pruning instead.
fn prune_accounts_history_above(&self, max_block: BlockNumber) -> ProviderResult<()> {
use alloy_primitives::Address;
use reth_db_api::models::ShardedKey;
let mut to_delete: Vec<ShardedKey<Address>> = Vec::new();
for result in self.iter::<tables::AccountsHistory>()? {
let (key, _) = result?;
let highest_block = key.highest_block_number;
if max_block == 0 || (highest_block != u64::MAX && highest_block > max_block) {
to_delete.push(key);
}
}
let deleted = to_delete.len();
if deleted > 0 {
tracing::info!(
target: "reth::providers::rocksdb",
deleted_count = deleted,
max_block,
"Pruning AccountsHistory entries"
);
let mut batch = self.batch();
for key in to_delete {
batch.delete::<tables::AccountsHistory>(key)?;
}
batch.commit()?;
}
Ok(())
}
}
#[cfg(test)]
@@ -803,6 +941,46 @@ mod tests {
);
}
#[test]
fn test_check_consistency_storages_history_behind_checkpoint_needs_unwind() {
use reth_db_api::models::storage_sharded_key::StorageShardedKey;
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::StoragesHistory>()
.build()
.unwrap();
// Insert data into RocksDB with highest_block_number below checkpoint
let key_block_50 = StorageShardedKey::new(Address::ZERO, B256::ZERO, 50);
let block_list = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
rocksdb.put::<tables::StoragesHistory>(key_block_50, &block_list).unwrap();
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
// Set checkpoint to block 100
{
let provider = factory.database_provider_rw().unwrap();
provider
.save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
.unwrap();
provider.commit().unwrap();
}
let provider = factory.database_provider_ro().unwrap();
// RocksDB only has data up to block 50, but checkpoint says block 100 was processed
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(
result,
Some(50),
"Should require unwind to block 50 to rebuild StoragesHistory"
);
}
/// Test that pruning works by fetching transactions and computing their hashes,
/// rather than iterating all rows. This test uses random blocks with unique
/// transactions so we can verify the correct entries are pruned.
@@ -918,4 +1096,175 @@ mod tests {
max_tx_to_keep
);
}
#[test]
fn test_check_consistency_accounts_history_empty_with_checkpoint_needs_unwind() {
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::AccountsHistory>()
.build()
.unwrap();
// Create a test provider factory for MDBX
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
// Set a checkpoint indicating we should have processed up to block 100
{
let provider = factory.database_provider_rw().unwrap();
provider
.save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
.unwrap();
provider.commit().unwrap();
}
let provider = factory.database_provider_ro().unwrap();
// RocksDB is empty but checkpoint says block 100 was processed
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(result, Some(0), "Should require unwind to block 0 to rebuild AccountsHistory");
}
#[test]
fn test_check_consistency_accounts_history_has_data_no_checkpoint_prunes_data() {
use reth_db_api::models::ShardedKey;
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::AccountsHistory>()
.build()
.unwrap();
// Insert data into RocksDB
let key = ShardedKey::new(Address::ZERO, 50);
let block_list = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
rocksdb.put::<tables::AccountsHistory>(key, &block_list).unwrap();
// Verify data exists
assert!(rocksdb.last::<tables::AccountsHistory>().unwrap().is_some());
// Create a test provider factory for MDBX with NO checkpoint
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
let provider = factory.database_provider_ro().unwrap();
// RocksDB has data but checkpoint is 0
// This means RocksDB has stale data that should be pruned (healed)
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(result, None, "Should heal by pruning, no unwind needed");
// Verify data was pruned
assert!(
rocksdb.last::<tables::AccountsHistory>().unwrap().is_none(),
"RocksDB should be empty after pruning"
);
}
#[test]
fn test_check_consistency_accounts_history_ahead_of_checkpoint_prunes_excess() {
use reth_db_api::models::ShardedKey;
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::AccountsHistory>()
.build()
.unwrap();
// Insert data into RocksDB with different highest_block_numbers
let key_block_50 = ShardedKey::new(Address::ZERO, 50);
let key_block_100 = ShardedKey::new(Address::random(), 100);
let key_block_150 = ShardedKey::new(Address::random(), 150);
let key_block_max = ShardedKey::new(Address::random(), u64::MAX);
let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
rocksdb.put::<tables::AccountsHistory>(key_block_50.clone(), &block_list).unwrap();
rocksdb.put::<tables::AccountsHistory>(key_block_100.clone(), &block_list).unwrap();
rocksdb.put::<tables::AccountsHistory>(key_block_150.clone(), &block_list).unwrap();
rocksdb.put::<tables::AccountsHistory>(key_block_max.clone(), &block_list).unwrap();
// Create a test provider factory for MDBX
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
// Set checkpoint to block 100
{
let provider = factory.database_provider_rw().unwrap();
provider
.save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
.unwrap();
provider.commit().unwrap();
}
let provider = factory.database_provider_ro().unwrap();
// RocksDB has entries with highest_block = 150 which exceeds checkpoint (100)
// Should prune entries where highest_block > 100 (but not u64::MAX sentinel)
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(result, None, "Should heal by pruning, no unwind needed");
// Verify key_block_150 was pruned, but others remain
assert!(
rocksdb.get::<tables::AccountsHistory>(key_block_50).unwrap().is_some(),
"Entry with highest_block=50 should remain"
);
assert!(
rocksdb.get::<tables::AccountsHistory>(key_block_100).unwrap().is_some(),
"Entry with highest_block=100 should remain"
);
assert!(
rocksdb.get::<tables::AccountsHistory>(key_block_150).unwrap().is_none(),
"Entry with highest_block=150 should be pruned"
);
assert!(
rocksdb.get::<tables::AccountsHistory>(key_block_max).unwrap().is_some(),
"Entry with highest_block=u64::MAX (sentinel) should remain"
);
}
#[test]
fn test_check_consistency_accounts_history_behind_checkpoint_needs_unwind() {
use reth_db_api::models::ShardedKey;
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::AccountsHistory>()
.build()
.unwrap();
// Insert data into RocksDB with highest_block_number below checkpoint
let key_block_50 = ShardedKey::new(Address::ZERO, 50);
let block_list = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
rocksdb.put::<tables::AccountsHistory>(key_block_50, &block_list).unwrap();
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
// Set checkpoint to block 100
{
let provider = factory.database_provider_rw().unwrap();
provider
.save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
.unwrap();
provider.commit().unwrap();
}
let provider = factory.database_provider_ro().unwrap();
// RocksDB only has data up to block 50, but checkpoint says block 100 was processed
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(
result,
Some(50),
"Should require unwind to block 50 to rebuild AccountsHistory"
);
}
}