mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-10 07:48:19 -05:00
feat(storage): add RocksDB consistency check on startup
Amp-Thread-ID: https://ampcode.com/threads/T-019b2fb1-ce5c-7251-b454-0d7472a0754a
This commit is contained in:
@@ -66,8 +66,9 @@ use reth_node_metrics::{
|
||||
};
|
||||
use reth_provider::{
|
||||
providers::{NodeTypesForProvider, ProviderNodeTypes, RocksDBProvider, StaticFileProvider},
|
||||
BlockHashReader, BlockNumReader, ProviderError, ProviderFactory, ProviderResult,
|
||||
StageCheckpointReader, StaticFileProviderBuilder, StaticFileProviderFactory,
|
||||
BlockHashReader, BlockNumReader, DatabaseProviderFactory, ProviderError, ProviderFactory,
|
||||
ProviderResult, RocksDBProviderFactory, StageCheckpointReader, StaticFileProviderBuilder,
|
||||
StaticFileProviderFactory,
|
||||
};
|
||||
use reth_prune::{PruneModes, PrunerBuilder};
|
||||
use reth_rpc_builder::config::RethRpcServerConfig;
|
||||
@@ -501,20 +502,51 @@ where
|
||||
)?
|
||||
.with_prune_modes(self.prune_modes());
|
||||
|
||||
// Check for consistency between database and static files. If it fails, it unwinds to
|
||||
// the first block that's consistent between database and static files.
|
||||
if let Some(unwind_target) =
|
||||
factory.static_file_provider().check_consistency(&factory.provider()?)?
|
||||
{
|
||||
// Check for consistency between database, static files, and RocksDB. If any
|
||||
// inconsistencies are found, unwind to the first block that's consistent across all
|
||||
// storage layers.
|
||||
//
|
||||
// Static file check runs first since RocksDB pruning logic may depend on static file data.
|
||||
// We compute a combined unwind target from both checks and run a single unwind pass.
|
||||
let static_file_unwind = factory
|
||||
.static_file_provider()
|
||||
.check_consistency(&factory.provider()?)?
|
||||
.map(|target| match target {
|
||||
PipelineTarget::Unwind(block) => block,
|
||||
PipelineTarget::Sync(_) => unreachable!("check_consistency returns Unwind"),
|
||||
});
|
||||
|
||||
// RocksDB consistency check - runs after static files since it may use static file data
|
||||
// for pruning transaction hashes.
|
||||
let rocksdb_unwind = factory
|
||||
.rocksdb_provider()
|
||||
.check_consistency(&factory.database_provider_ro()?)?;
|
||||
|
||||
// Combine unwind targets - take the minimum (most conservative) if both exist
|
||||
let unwind_target = match (static_file_unwind, rocksdb_unwind) {
|
||||
(None, None) => None,
|
||||
(Some(a), None) | (None, Some(a)) => Some(a),
|
||||
(Some(a), Some(b)) => Some(a.min(b)),
|
||||
};
|
||||
|
||||
if let Some(unwind_block) = unwind_target {
|
||||
// Highly unlikely to happen, and given its destructive nature, it's better to panic
|
||||
// instead.
|
||||
let inconsistency_source = match (static_file_unwind, rocksdb_unwind) {
|
||||
(Some(_), Some(_)) => "static file <> database and RocksDB <> database",
|
||||
(Some(_), None) => "static file <> database",
|
||||
(None, Some(_)) => "RocksDB <> database",
|
||||
(None, None) => unreachable!(),
|
||||
};
|
||||
assert_ne!(
|
||||
unwind_target,
|
||||
PipelineTarget::Unwind(0),
|
||||
"A static file <> database inconsistency was found that would trigger an unwind to block 0"
|
||||
unwind_block,
|
||||
0,
|
||||
"A {inconsistency_source} inconsistency was found that would trigger an unwind to block 0"
|
||||
);
|
||||
|
||||
info!(target: "reth::cli", unwind_target = %unwind_target, "Executing an unwind after a failed storage consistency check.");
|
||||
let unwind_target = PipelineTarget::Unwind(unwind_block);
|
||||
|
||||
info!(target: "reth::cli", %unwind_target, %inconsistency_source, "Executing unwind after consistency check.");
|
||||
|
||||
let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);
|
||||
|
||||
@@ -548,7 +580,7 @@ where
|
||||
}),
|
||||
);
|
||||
rx.await?.inspect_err(|err| {
|
||||
error!(target: "reth::cli", unwind_target = %unwind_target, %err, "failed to run unwind")
|
||||
error!(target: "reth::cli", %unwind_target, %err, "failed to run unwind")
|
||||
})?;
|
||||
}
|
||||
|
||||
|
||||
@@ -69,6 +69,13 @@ impl RocksDBProvider {
|
||||
unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
|
||||
}
|
||||
|
||||
// Check AccountsHistory if stored in RocksDB
|
||||
if provider.cached_storage_settings().account_history_in_rocksdb &&
|
||||
let Some(target) = self.check_accounts_history(provider)?
|
||||
{
|
||||
unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
|
||||
}
|
||||
|
||||
Ok(unwind_target)
|
||||
}
|
||||
|
||||
@@ -327,6 +334,113 @@ impl RocksDBProvider {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Checks invariants for the `AccountsHistory` table.
|
||||
///
|
||||
/// Returns a block number to unwind to if `RocksDB` is behind the checkpoint.
|
||||
/// If `RocksDB` is ahead of the checkpoint, excess entries are pruned (healed).
|
||||
fn check_accounts_history<Provider>(
|
||||
&self,
|
||||
provider: &Provider,
|
||||
) -> ProviderResult<Option<BlockNumber>>
|
||||
where
|
||||
Provider: DBProvider + StageCheckpointReader,
|
||||
{
|
||||
// Get the IndexAccountHistory stage checkpoint
|
||||
let checkpoint = provider
|
||||
.get_stage_checkpoint(StageId::IndexAccountHistory)?
|
||||
.map(|cp| cp.block_number)
|
||||
.unwrap_or(0);
|
||||
|
||||
// Check if RocksDB has any data
|
||||
let rocks_first = self.first::<tables::AccountsHistory>()?;
|
||||
|
||||
match rocks_first {
|
||||
Some(_) => {
|
||||
// If checkpoint is 0 but we have data, clear everything
|
||||
if checkpoint == 0 {
|
||||
tracing::info!(
|
||||
target: "reth::providers::rocksdb",
|
||||
"AccountsHistory has data but checkpoint is 0, clearing all"
|
||||
);
|
||||
self.prune_accounts_history_above(0)?;
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// Find the max highest_block_number (excluding u64::MAX sentinel) across all
|
||||
// entries
|
||||
let mut max_highest_block = 0u64;
|
||||
for result in self.iter::<tables::AccountsHistory>()? {
|
||||
let (key, _) = result?;
|
||||
let highest = key.highest_block_number;
|
||||
if highest != u64::MAX && highest > max_highest_block {
|
||||
max_highest_block = highest;
|
||||
}
|
||||
}
|
||||
|
||||
// If any entry has highest_block > checkpoint, prune excess
|
||||
if max_highest_block > checkpoint {
|
||||
tracing::info!(
|
||||
target: "reth::providers::rocksdb",
|
||||
rocks_highest = max_highest_block,
|
||||
checkpoint,
|
||||
"AccountsHistory ahead of checkpoint, pruning excess data"
|
||||
);
|
||||
self.prune_accounts_history_above(checkpoint)?;
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
None => {
|
||||
// Empty RocksDB table
|
||||
if checkpoint > 0 {
|
||||
// Stage says we should have data but we don't
|
||||
return Ok(Some(0));
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Prunes `AccountsHistory` entries where `highest_block_number` > `max_block`.
|
||||
///
|
||||
/// For `AccountsHistory`, the key is `ShardedKey<Address>` which contains
|
||||
/// `highest_block_number`, so we can iterate and delete entries where
|
||||
/// `key.highest_block_number > max_block`.
|
||||
///
|
||||
/// TODO(<https://github.com/paradigmxyz/reth/issues/20417>): this iterates the whole table,
|
||||
/// which is inefficient. Use changeset-based pruning instead.
|
||||
fn prune_accounts_history_above(&self, max_block: BlockNumber) -> ProviderResult<()> {
|
||||
use alloy_primitives::Address;
|
||||
use reth_db_api::models::ShardedKey;
|
||||
|
||||
let mut to_delete: Vec<ShardedKey<Address>> = Vec::new();
|
||||
for result in self.iter::<tables::AccountsHistory>()? {
|
||||
let (key, _) = result?;
|
||||
let highest_block = key.highest_block_number;
|
||||
if max_block == 0 || (highest_block != u64::MAX && highest_block > max_block) {
|
||||
to_delete.push(key);
|
||||
}
|
||||
}
|
||||
|
||||
let deleted = to_delete.len();
|
||||
if deleted > 0 {
|
||||
tracing::info!(
|
||||
target: "reth::providers::rocksdb",
|
||||
deleted_count = deleted,
|
||||
max_block,
|
||||
"Pruning AccountsHistory entries"
|
||||
);
|
||||
|
||||
let mut batch = self.batch();
|
||||
for key in to_delete {
|
||||
batch.delete::<tables::AccountsHistory>(key)?;
|
||||
}
|
||||
batch.commit()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -803,6 +917,137 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_check_consistency_accounts_history_empty_with_checkpoint_needs_unwind() {
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
let rocksdb = RocksDBBuilder::new(temp_dir.path())
|
||||
.with_table::<tables::AccountsHistory>()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
// Create a test provider factory for MDBX
|
||||
let factory = create_test_provider_factory();
|
||||
factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_account_history_in_rocksdb(true),
|
||||
);
|
||||
|
||||
// Set a checkpoint indicating we should have processed up to block 100
|
||||
{
|
||||
let provider = factory.database_provider_rw().unwrap();
|
||||
provider
|
||||
.save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
|
||||
.unwrap();
|
||||
provider.commit().unwrap();
|
||||
}
|
||||
|
||||
let provider = factory.database_provider_ro().unwrap();
|
||||
|
||||
// RocksDB is empty but checkpoint says block 100 was processed
|
||||
let result = rocksdb.check_consistency(&provider).unwrap();
|
||||
assert_eq!(result, Some(0), "Should require unwind to block 0 to rebuild AccountsHistory");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_check_consistency_accounts_history_has_data_no_checkpoint_prunes_data() {
|
||||
use reth_db_api::models::ShardedKey;
|
||||
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
let rocksdb = RocksDBBuilder::new(temp_dir.path())
|
||||
.with_table::<tables::AccountsHistory>()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
// Insert data into RocksDB
|
||||
let key = ShardedKey::new(Address::ZERO, 50);
|
||||
let block_list = BlockNumberList::new_pre_sorted([10, 20, 30, 50]);
|
||||
rocksdb.put::<tables::AccountsHistory>(key, &block_list).unwrap();
|
||||
|
||||
// Verify data exists
|
||||
assert!(rocksdb.last::<tables::AccountsHistory>().unwrap().is_some());
|
||||
|
||||
// Create a test provider factory for MDBX with NO checkpoint
|
||||
let factory = create_test_provider_factory();
|
||||
factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_account_history_in_rocksdb(true),
|
||||
);
|
||||
|
||||
let provider = factory.database_provider_ro().unwrap();
|
||||
|
||||
// RocksDB has data but checkpoint is 0
|
||||
// This means RocksDB has stale data that should be pruned (healed)
|
||||
let result = rocksdb.check_consistency(&provider).unwrap();
|
||||
assert_eq!(result, None, "Should heal by pruning, no unwind needed");
|
||||
|
||||
// Verify data was pruned
|
||||
assert!(
|
||||
rocksdb.last::<tables::AccountsHistory>().unwrap().is_none(),
|
||||
"RocksDB should be empty after pruning"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_check_consistency_accounts_history_ahead_of_checkpoint_prunes_excess() {
|
||||
use reth_db_api::models::ShardedKey;
|
||||
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
let rocksdb = RocksDBBuilder::new(temp_dir.path())
|
||||
.with_table::<tables::AccountsHistory>()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
// Insert data into RocksDB with different highest_block_numbers
|
||||
let key_block_50 = ShardedKey::new(Address::ZERO, 50);
|
||||
let key_block_100 = ShardedKey::new(Address::random(), 100);
|
||||
let key_block_150 = ShardedKey::new(Address::random(), 150);
|
||||
let key_block_max = ShardedKey::new(Address::random(), u64::MAX);
|
||||
|
||||
let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
|
||||
rocksdb.put::<tables::AccountsHistory>(key_block_50.clone(), &block_list).unwrap();
|
||||
rocksdb.put::<tables::AccountsHistory>(key_block_100.clone(), &block_list).unwrap();
|
||||
rocksdb.put::<tables::AccountsHistory>(key_block_150.clone(), &block_list).unwrap();
|
||||
rocksdb.put::<tables::AccountsHistory>(key_block_max.clone(), &block_list).unwrap();
|
||||
|
||||
// Create a test provider factory for MDBX
|
||||
let factory = create_test_provider_factory();
|
||||
factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_account_history_in_rocksdb(true),
|
||||
);
|
||||
|
||||
// Set checkpoint to block 100
|
||||
{
|
||||
let provider = factory.database_provider_rw().unwrap();
|
||||
provider
|
||||
.save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
|
||||
.unwrap();
|
||||
provider.commit().unwrap();
|
||||
}
|
||||
|
||||
let provider = factory.database_provider_ro().unwrap();
|
||||
|
||||
// RocksDB has entries with highest_block = 150 which exceeds checkpoint (100)
|
||||
// Should prune entries where highest_block > 100 (but not u64::MAX sentinel)
|
||||
let result = rocksdb.check_consistency(&provider).unwrap();
|
||||
assert_eq!(result, None, "Should heal by pruning, no unwind needed");
|
||||
|
||||
// Verify key_block_150 was pruned, but others remain
|
||||
assert!(
|
||||
rocksdb.get::<tables::AccountsHistory>(key_block_50).unwrap().is_some(),
|
||||
"Entry with highest_block=50 should remain"
|
||||
);
|
||||
assert!(
|
||||
rocksdb.get::<tables::AccountsHistory>(key_block_100).unwrap().is_some(),
|
||||
"Entry with highest_block=100 should remain"
|
||||
);
|
||||
assert!(
|
||||
rocksdb.get::<tables::AccountsHistory>(key_block_150).unwrap().is_none(),
|
||||
"Entry with highest_block=150 should be pruned"
|
||||
);
|
||||
assert!(
|
||||
rocksdb.get::<tables::AccountsHistory>(key_block_max).unwrap().is_some(),
|
||||
"Entry with highest_block=u64::MAX (sentinel) should remain"
|
||||
);
|
||||
}
|
||||
|
||||
/// Test that pruning works by fetching transactions and computing their hashes,
|
||||
/// rather than iterating all rows. This test uses random blocks with unique
|
||||
/// transactions so we can verify the correct entries are pruned.
|
||||
|
||||
@@ -77,6 +77,16 @@ impl RocksDBProvider {
|
||||
pub const fn tx(&self) -> RocksTx {
|
||||
RocksTx
|
||||
}
|
||||
|
||||
/// Check consistency of `RocksDB` tables (stub implementation).
|
||||
///
|
||||
/// Returns `None` since there is no `RocksDB` data to check when the feature is disabled.
|
||||
pub fn check_consistency<Provider>(
|
||||
&self,
|
||||
_provider: &Provider,
|
||||
) -> ProviderResult<Option<alloy_primitives::BlockNumber>> {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
/// A stub batch writer for `RocksDB` on non-Unix platforms.
|
||||
|
||||
Reference in New Issue
Block a user