mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
feat(rocksdb): static file based healing for TransactionHashNumbers (#21343)
This commit is contained in:
@@ -9,13 +9,12 @@ use crate::StaticFileProviderFactory;
|
||||
use alloy_eips::eip2718::Encodable2718;
|
||||
use alloy_primitives::BlockNumber;
|
||||
use rayon::prelude::*;
|
||||
use reth_db::cursor::DbCursorRO;
|
||||
use reth_db_api::{tables, transaction::DbTx};
|
||||
use reth_db_api::tables;
|
||||
use reth_stages_types::StageId;
|
||||
use reth_static_file_types::StaticFileSegment;
|
||||
use reth_storage_api::{
|
||||
ChangeSetReader, DBProvider, StageCheckpointReader, StorageChangeSetReader,
|
||||
StorageSettingsCache, TransactionsProvider,
|
||||
BlockBodyIndicesProvider, ChangeSetReader, DBProvider, StageCheckpointReader,
|
||||
StorageChangeSetReader, StorageSettingsCache, TransactionsProvider,
|
||||
};
|
||||
use reth_storage_errors::provider::ProviderResult;
|
||||
use std::collections::HashSet;
|
||||
@@ -55,15 +54,16 @@ impl RocksDBProvider {
|
||||
+ StageCheckpointReader
|
||||
+ StorageSettingsCache
|
||||
+ StaticFileProviderFactory
|
||||
+ BlockBodyIndicesProvider
|
||||
+ StorageChangeSetReader
|
||||
+ ChangeSetReader
|
||||
+ TransactionsProvider<Transaction: Encodable2718>,
|
||||
{
|
||||
let mut unwind_target: Option<BlockNumber> = None;
|
||||
|
||||
// Check TransactionHashNumbers if stored in RocksDB
|
||||
// Heal TransactionHashNumbers if stored in RocksDB
|
||||
if provider.cached_storage_settings().transaction_hash_numbers_in_rocksdb &&
|
||||
let Some(target) = self.check_transaction_hash_numbers(provider)?
|
||||
let Some(target) = self.heal_transaction_hash_numbers(provider)?
|
||||
{
|
||||
unwind_target = Some(unwind_target.map_or(target, |t| t.min(target)));
|
||||
}
|
||||
@@ -85,19 +85,13 @@ impl RocksDBProvider {
|
||||
Ok(unwind_target)
|
||||
}
|
||||
|
||||
/// Checks invariants for the `TransactionHashNumbers` table.
|
||||
/// Heals the `TransactionHashNumbers` table.
|
||||
///
|
||||
/// Returns a block number to unwind to if MDBX is behind the checkpoint.
|
||||
/// If static files are ahead of MDBX, excess `RocksDB` entries are pruned (healed).
|
||||
///
|
||||
/// # Approach
|
||||
///
|
||||
/// Instead of iterating `RocksDB` entries (which is expensive and doesn't give us the
|
||||
/// tx range we need), we use static files and MDBX to determine what needs pruning:
|
||||
/// - Static files are committed before `RocksDB`, so they're at least at the same height
|
||||
/// - MDBX `TransactionBlocks` tells us what's been fully committed
|
||||
/// - If static files have more transactions than MDBX, prune the excess range
|
||||
fn check_transaction_hash_numbers<Provider>(
|
||||
/// - Fast path: if checkpoint == 0 AND `RocksDB` has data, clear everything
|
||||
/// - If `sf_tip` < checkpoint, return unwind target (static files behind)
|
||||
/// - If `sf_tip` == checkpoint, nothing to do
|
||||
/// - If `sf_tip` > checkpoint, heal via transaction ranges in batches
|
||||
fn heal_transaction_hash_numbers<Provider>(
|
||||
&self,
|
||||
provider: &Provider,
|
||||
) -> ProviderResult<Option<BlockNumber>>
|
||||
@@ -105,73 +99,99 @@ impl RocksDBProvider {
|
||||
Provider: DBProvider
|
||||
+ StageCheckpointReader
|
||||
+ StaticFileProviderFactory
|
||||
+ BlockBodyIndicesProvider
|
||||
+ TransactionsProvider<Transaction: Encodable2718>,
|
||||
{
|
||||
// Get the TransactionLookup stage checkpoint
|
||||
let checkpoint = provider
|
||||
.get_stage_checkpoint(StageId::TransactionLookup)?
|
||||
.map(|cp| cp.block_number)
|
||||
.unwrap_or(0);
|
||||
|
||||
// Get last tx_num from MDBX - this tells us what MDBX has fully committed
|
||||
let mut cursor = provider.tx_ref().cursor_read::<tables::TransactionBlocks>()?;
|
||||
let mdbx_last = cursor.last()?;
|
||||
|
||||
// Get highest tx_num from static files - this tells us what tx data is available
|
||||
let highest_static_tx = provider
|
||||
let sf_tip = provider
|
||||
.static_file_provider()
|
||||
.get_highest_static_file_tx(StaticFileSegment::Transactions);
|
||||
.get_highest_static_file_block(StaticFileSegment::Transactions)
|
||||
.unwrap_or(0);
|
||||
|
||||
match (mdbx_last, highest_static_tx) {
|
||||
(Some((mdbx_tx, mdbx_block)), Some(highest_tx)) if highest_tx > mdbx_tx => {
|
||||
// Static files are ahead of MDBX - prune RocksDB entries for the excess range.
|
||||
// This is the common case during recovery from a crash during unwinding.
|
||||
tracing::info!(
|
||||
target: "reth::providers::rocksdb",
|
||||
mdbx_last_tx = mdbx_tx,
|
||||
mdbx_block,
|
||||
highest_static_tx = highest_tx,
|
||||
"Static files ahead of MDBX, pruning TransactionHashNumbers excess data"
|
||||
);
|
||||
self.prune_transaction_hash_numbers_in_range(provider, (mdbx_tx + 1)..=highest_tx)?;
|
||||
// Fast path: if checkpoint is 0 and RocksDB has data, clear everything.
|
||||
if checkpoint == 0 && self.first::<tables::TransactionHashNumbers>()?.is_some() {
|
||||
tracing::info!(
|
||||
target: "reth::providers::rocksdb",
|
||||
"TransactionHashNumbers has data but checkpoint is 0, clearing all"
|
||||
);
|
||||
self.clear::<tables::TransactionHashNumbers>()?;
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// After pruning, check if MDBX is behind checkpoint
|
||||
if checkpoint > mdbx_block {
|
||||
tracing::warn!(
|
||||
target: "reth::providers::rocksdb",
|
||||
mdbx_block,
|
||||
checkpoint,
|
||||
"MDBX behind checkpoint after pruning, unwind needed"
|
||||
);
|
||||
return Ok(Some(mdbx_block));
|
||||
}
|
||||
}
|
||||
(Some((_mdbx_tx, mdbx_block)), _) => {
|
||||
// MDBX and static files are in sync (or static files don't have more data).
|
||||
// Check if MDBX is behind checkpoint.
|
||||
if checkpoint > mdbx_block {
|
||||
tracing::warn!(
|
||||
target: "reth::providers::rocksdb",
|
||||
mdbx_block,
|
||||
checkpoint,
|
||||
"MDBX behind checkpoint, unwind needed"
|
||||
);
|
||||
return Ok(Some(mdbx_block));
|
||||
}
|
||||
}
|
||||
(None, Some(highest_tx)) => {
|
||||
// MDBX has no transactions but static files have data.
|
||||
// This means RocksDB might have stale entries - prune them all.
|
||||
tracing::info!(
|
||||
target: "reth::providers::rocksdb",
|
||||
highest_static_tx = highest_tx,
|
||||
"MDBX empty but static files have data, pruning all TransactionHashNumbers"
|
||||
);
|
||||
self.prune_transaction_hash_numbers_in_range(provider, 0..=highest_tx)?;
|
||||
}
|
||||
(None, None) => {
|
||||
// Both MDBX and static files are empty, nothing to check.
|
||||
}
|
||||
if sf_tip < checkpoint {
|
||||
// This should never happen in normal operation - static files are always committed
|
||||
// before RocksDB. If we get here, something is seriously wrong. The unwind is a
|
||||
// best-effort attempt but is probably futile.
|
||||
tracing::warn!(
|
||||
target: "reth::providers::rocksdb",
|
||||
sf_tip,
|
||||
checkpoint,
|
||||
"TransactionHashNumbers: static file tip behind checkpoint, unwind needed"
|
||||
);
|
||||
return Ok(Some(sf_tip));
|
||||
}
|
||||
|
||||
// sf_tip == checkpoint - nothing to do
|
||||
if sf_tip == checkpoint {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// Get end tx from static files (authoritative for sf_tip)
|
||||
let sf_tip_end_tx = provider
|
||||
.static_file_provider()
|
||||
.get_highest_static_file_tx(StaticFileSegment::Transactions)
|
||||
.unwrap_or(0);
|
||||
|
||||
// Get the first tx after the checkpoint block from MDBX (authoritative up to checkpoint)
|
||||
let checkpoint_next_tx = provider
|
||||
.block_body_indices(checkpoint)?
|
||||
.map(|indices| indices.next_tx_num())
|
||||
.unwrap_or(0);
|
||||
|
||||
if sf_tip_end_tx < checkpoint_next_tx {
|
||||
// This should never happen in normal operation - static files should have all
|
||||
// transactions up to sf_tip. If we get here, something is seriously wrong.
|
||||
// The unwind is a best-effort attempt but is probably futile.
|
||||
tracing::warn!(
|
||||
target: "reth::providers::rocksdb",
|
||||
sf_tip_end_tx,
|
||||
checkpoint_next_tx,
|
||||
checkpoint,
|
||||
sf_tip,
|
||||
"TransactionHashNumbers: static file tx tip behind checkpoint, unwind needed"
|
||||
);
|
||||
return Ok(Some(sf_tip));
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
target: "reth::providers::rocksdb",
|
||||
checkpoint,
|
||||
sf_tip,
|
||||
checkpoint_next_tx,
|
||||
sf_tip_end_tx,
|
||||
"TransactionHashNumbers: healing via transaction ranges"
|
||||
);
|
||||
|
||||
const BATCH_SIZE: u64 = 10_000;
|
||||
let mut batch_start = checkpoint_next_tx;
|
||||
|
||||
while batch_start <= sf_tip_end_tx {
|
||||
let batch_end = batch_start.saturating_add(BATCH_SIZE - 1).min(sf_tip_end_tx);
|
||||
|
||||
tracing::debug!(
|
||||
target: "reth::providers::rocksdb",
|
||||
batch_start,
|
||||
batch_end,
|
||||
"Pruning TransactionHashNumbers batch"
|
||||
);
|
||||
|
||||
self.prune_transaction_hash_numbers_in_range(provider, batch_start..=batch_end)?;
|
||||
|
||||
batch_start = batch_end.saturating_add(1);
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
@@ -425,7 +445,7 @@ mod tests {
|
||||
BlockWriter, DatabaseProviderFactory, StageCheckpointWriter, TransactionsProvider,
|
||||
};
|
||||
use alloy_primitives::{Address, B256};
|
||||
use reth_db::cursor::DbCursorRW;
|
||||
use reth_db::cursor::{DbCursorRO, DbCursorRW};
|
||||
use reth_db_api::{
|
||||
models::{storage_sharded_key::StorageShardedKey, StorageSettings},
|
||||
tables::{self, BlockNumberList},
|
||||
@@ -520,13 +540,15 @@ mod tests {
|
||||
let provider = factory.database_provider_ro().unwrap();
|
||||
|
||||
// RocksDB is empty but checkpoint says block 100 was processed.
|
||||
// This is treated as a first-run/migration scenario - no unwind needed.
|
||||
// Since static file tip defaults to 0 when None, and 0 < 100, an unwind is triggered.
|
||||
let result = rocksdb.check_consistency(&provider).unwrap();
|
||||
assert_eq!(result, None, "Empty data with checkpoint is treated as first run");
|
||||
assert_eq!(result, Some(0), "Static file tip (0) behind checkpoint (100) triggers unwind");
|
||||
}
|
||||
|
||||
/// Tests that when checkpoint=0 and `RocksDB` has data, all entries are pruned.
|
||||
/// This simulates a crash recovery scenario where the checkpoint was lost.
|
||||
#[test]
|
||||
fn test_check_consistency_mdbx_empty_static_files_have_data_prunes_rocksdb() {
|
||||
fn test_check_consistency_checkpoint_zero_with_rocksdb_data_prunes_all() {
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
let rocksdb = RocksDBBuilder::new(temp_dir.path())
|
||||
.with_table::<tables::TransactionHashNumbers>()
|
||||
@@ -564,22 +586,12 @@ mod tests {
|
||||
provider.commit().unwrap();
|
||||
}
|
||||
|
||||
// Simulate crash recovery: MDBX was reset but static files and RocksDB still have data.
|
||||
// Clear TransactionBlocks to simulate empty MDBX state.
|
||||
// Explicitly clear the TransactionLookup checkpoint to simulate crash recovery
|
||||
{
|
||||
let provider = factory.database_provider_rw().unwrap();
|
||||
let mut cursor = provider.tx_ref().cursor_write::<tables::TransactionBlocks>().unwrap();
|
||||
let mut to_delete = Vec::new();
|
||||
let mut walker = cursor.walk(Some(0)).unwrap();
|
||||
while let Some((tx_num, _)) = walker.next().transpose().unwrap() {
|
||||
to_delete.push(tx_num);
|
||||
}
|
||||
drop(walker);
|
||||
for tx_num in to_delete {
|
||||
cursor.seek_exact(tx_num).unwrap();
|
||||
cursor.delete_current().unwrap();
|
||||
}
|
||||
// No checkpoint set (checkpoint = 0)
|
||||
provider
|
||||
.save_stage_checkpoint(StageId::TransactionLookup, StageCheckpoint::new(0))
|
||||
.unwrap();
|
||||
provider.commit().unwrap();
|
||||
}
|
||||
|
||||
@@ -588,12 +600,12 @@ mod tests {
|
||||
|
||||
let provider = factory.database_provider_ro().unwrap();
|
||||
|
||||
// MDBX TransactionBlocks is empty, but static files have transaction data.
|
||||
// This means RocksDB has stale data that should be pruned (healed).
|
||||
// checkpoint = 0 but RocksDB has data.
|
||||
// This means RocksDB has stale data that should be cleared.
|
||||
let result = rocksdb.check_consistency(&provider).unwrap();
|
||||
assert_eq!(result, None, "Should heal by pruning, no unwind needed");
|
||||
assert_eq!(result, None, "Should heal by clearing, no unwind needed");
|
||||
|
||||
// Verify data was pruned
|
||||
// Verify data was cleared
|
||||
for hash in &tx_hashes {
|
||||
assert!(
|
||||
rocksdb.get::<tables::TransactionHashNumbers>(*hash).unwrap().is_none(),
|
||||
@@ -669,7 +681,6 @@ mod tests {
|
||||
"RocksDB should be empty after pruning"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_check_consistency_mdbx_behind_checkpoint_needs_unwind() {
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
@@ -707,9 +718,9 @@ mod tests {
|
||||
provider.commit().unwrap();
|
||||
}
|
||||
|
||||
// Now simulate a scenario where checkpoint is ahead of MDBX.
|
||||
// This happens when the checkpoint was saved but MDBX data was lost/corrupted.
|
||||
// Set checkpoint to block 10 (beyond our actual data at block 2)
|
||||
// sf_tip is at block 2, checkpoint is at block 10
|
||||
// Since sf_tip < checkpoint, we need to unwind to sf_tip
|
||||
{
|
||||
let provider = factory.database_provider_rw().unwrap();
|
||||
provider
|
||||
@@ -720,15 +731,9 @@ mod tests {
|
||||
|
||||
let provider = factory.database_provider_ro().unwrap();
|
||||
|
||||
// MDBX has data up to block 2, but checkpoint says block 10 was processed.
|
||||
// The static files highest tx matches MDBX last tx (both at block 2).
|
||||
// Checkpoint > mdbx_block means we need to unwind to rebuild.
|
||||
// sf_tip (2) < checkpoint (10), so unwind to sf_tip is needed
|
||||
let result = rocksdb.check_consistency(&provider).unwrap();
|
||||
assert_eq!(
|
||||
result,
|
||||
Some(2),
|
||||
"Should require unwind to block 2 (MDBX's last block) to rebuild from checkpoint"
|
||||
);
|
||||
assert_eq!(result, Some(2), "sf_tip < checkpoint requires unwind to sf_tip");
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
Reference in New Issue
Block a user