feat(stages): add RocksDB support for IndexStorageHistoryStage and IndexAccountHistoryStage

Implements #20390 - Modify history indexing stages to use EitherWriter for RocksDB writes.

**Execute changes:**
- Add RocksDB write path to IndexStorageHistoryStage using EitherWriter
- Add RocksDB write path to IndexAccountHistoryStage using EitherWriter
- Add load_storage_history_indices_via_writer and load_account_history_indices_via_writer
  helpers with proper sharding logic and merge support for incremental syncs
- Add StorageSettingsCache, RocksDBProviderFactory, NodePrimitivesProvider trait bounds

**Unwind changes:**
- Add unwind_history_via_rocksdb_generic helper to reduce code duplication
- Stream changesets directly into BTreeSet instead of collecting Vec first
- Add comprehensive test coverage for edge cases

When storages_history_in_rocksdb or account_history_in_rocksdb is enabled,
writes and unwinds are routed to RocksDB instead of MDBX.

Amp-Thread-ID: https://ampcode.com/threads/T-019b8ca2-143e-73fa-93d3-c806e9da7b92
This commit is contained in:
yongkangc
2026-01-05 05:34:43 +00:00
parent 3d4efdb271
commit 2284ede549
4 changed files with 1416 additions and 31 deletions

View File

@@ -2,7 +2,12 @@ use super::{collect_history_indices, load_history_indices};
use alloy_primitives::Address;
use reth_config::config::{EtlConfig, IndexHistoryConfig};
use reth_db_api::{models::ShardedKey, table::Decode, tables, transaction::DbTxMut};
use reth_provider::{DBProvider, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter};
#[cfg(all(unix, feature = "rocksdb"))]
use reth_provider::EitherWriter;
use reth_provider::{
DBProvider, HistoryWriter, NodePrimitivesProvider, PruneCheckpointReader,
PruneCheckpointWriter, RocksDBProviderFactory, StorageSettingsCache,
};
use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
use reth_stages_api::{
ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput,
@@ -43,8 +48,13 @@ impl Default for IndexAccountHistoryStage {
impl<Provider> Stage<Provider> for IndexAccountHistoryStage
where
Provider:
DBProvider<Tx: DbTxMut> + HistoryWriter + PruneCheckpointReader + PruneCheckpointWriter,
Provider: DBProvider<Tx: DbTxMut>
+ HistoryWriter
+ PruneCheckpointReader
+ PruneCheckpointWriter
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider,
{
/// Return the id of the stage
fn id(&self) -> StageId {
@@ -110,15 +120,55 @@ where
&self.etl_config,
)?;
info!(target: "sync::stages::index_account_history::exec", "Loading indices into database");
load_history_indices::<_, tables::AccountsHistory, _>(
provider,
collector,
first_sync,
ShardedKey::new,
ShardedKey::<Address>::decode_owned,
|key| key.key,
)?;
// Check if RocksDB is enabled for account history
#[cfg(all(unix, feature = "rocksdb"))]
let use_rocksdb = provider.cached_storage_settings().account_history_in_rocksdb;
#[cfg(not(all(unix, feature = "rocksdb")))]
let use_rocksdb = false;
info!(target: "sync::stages::index_account_history::exec", ?use_rocksdb, "Loading indices into database");
if use_rocksdb {
#[cfg(all(unix, feature = "rocksdb"))]
{
// Create RocksDB batch
let rocksdb = provider.rocksdb_provider();
let mut rocksdb_batch = rocksdb.batch();
if first_sync {
super::clear_rocksdb_table::<tables::AccountsHistory>(
&rocksdb,
&mut rocksdb_batch,
)?;
}
// Create writer that routes to RocksDB
let mut writer = EitherWriter::new_accounts_history(provider, rocksdb_batch)?;
// Load indices using the RocksDB path
super::load_account_history_indices_via_writer(
&mut writer,
collector,
first_sync,
provider,
)?;
// Extract and register RocksDB batch for commit at provider level
if let Some(batch) = writer.into_raw_rocksdb_batch() {
provider.set_pending_rocksdb_batch(batch);
}
}
} else {
// Keep existing MDBX path unchanged
load_history_indices::<_, tables::AccountsHistory, _>(
provider,
collector,
first_sync,
ShardedKey::new,
ShardedKey::<Address>::decode_owned,
|key| key.key,
)?;
}
Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: true })
}
@@ -132,9 +182,40 @@ where
let (range, unwind_progress, _) =
input.unwind_block_range_with_threshold(self.commit_threshold);
provider.unwind_account_history_indices_range(range)?;
// Check if RocksDB is enabled for account history
#[cfg(all(unix, feature = "rocksdb"))]
let use_rocksdb = provider.cached_storage_settings().account_history_in_rocksdb;
#[cfg(not(all(unix, feature = "rocksdb")))]
let use_rocksdb = false;
if use_rocksdb {
#[cfg(all(unix, feature = "rocksdb"))]
{
use reth_db_api::{cursor::DbCursorRO, transaction::DbTx};
use std::collections::BTreeSet;
// Stream changesets directly into a set of affected addresses
let mut affected_addresses = BTreeSet::new();
for entry in provider
.tx_ref()
.cursor_read::<tables::AccountChangeSets>()?
.walk_range(range.clone())?
{
let (_block, account) = entry?;
affected_addresses.insert(account.address);
}
// Unwind using RocksDB
super::unwind_account_history_via_rocksdb(
provider,
affected_addresses,
*range.start(),
)?;
}
} else {
provider.unwind_account_history_indices_range(range)?;
}
// from HistoryIndex higher than that number.
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) })
}
}
@@ -632,4 +713,427 @@ mod tests {
Ok(())
}
}
#[cfg(all(unix, feature = "rocksdb"))]
mod rocksdb_tests {
use super::*;
use reth_provider::RocksDBProviderFactory;
use reth_storage_api::StorageSettings;
#[tokio::test]
async fn execute_writes_account_history_to_rocksdb_when_enabled() {
let db = TestStageDB::default();
// Enable RocksDB for account history
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
// setup
partial_setup(&db);
// run
run(&db, MAX_BLOCK, None);
// Verify MDBX table is empty (data should be in RocksDB)
let mdbx_count = db.table::<tables::AccountsHistory>().unwrap().len();
assert_eq!(
mdbx_count, 0,
"MDBX AccountsHistory should be empty when RocksDB is enabled"
);
// Verify RocksDB has the data
let rocksdb = db.factory.rocksdb_provider();
let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
assert!(result.is_some(), "Account history should exist in RocksDB");
// Verify the block numbers are correct
let list = result.unwrap();
let blocks: Vec<u64> = list.iter().collect();
assert!(!blocks.is_empty(), "Block list should not be empty");
}
#[tokio::test]
async fn first_sync_clears_stale_rocksdb_account_history() {
let db = TestStageDB::default();
// Enable RocksDB for account history
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
// Seed RocksDB with a stale entry for a different address
let rocksdb = db.factory.rocksdb_provider();
let stale_address = address!("0x0000000000000000000000000000000000000002");
let stale_key = ShardedKey { key: stale_address, highest_block_number: u64::MAX };
rocksdb.put::<tables::AccountsHistory>(stale_key.clone(), &list(&[999])).unwrap();
// setup
partial_setup(&db);
// run (first sync)
run(&db, MAX_BLOCK, None);
// Verify stale entry is removed
let rocksdb = db.factory.rocksdb_provider();
let stale = rocksdb.get::<tables::AccountsHistory>(stale_key).unwrap();
assert!(
stale.is_none(),
"Stale RocksDB account history should be cleared on first sync"
);
}
#[tokio::test]
async fn execute_writes_to_mdbx_when_rocksdb_disabled() {
let db = TestStageDB::default();
// Ensure RocksDB is disabled for account history (default)
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(false),
);
// setup
partial_setup(&db);
// run
run(&db, MAX_BLOCK, None);
// Verify MDBX table has data
let mdbx_count = db.table::<tables::AccountsHistory>().unwrap().len();
assert!(
mdbx_count > 0,
"MDBX AccountsHistory should have data when RocksDB is disabled"
);
}
/// Test incremental sync with RocksDB: run stage twice and verify indices are merged.
#[tokio::test]
async fn incremental_sync_merges_indices_in_rocksdb() {
let db = TestStageDB::default();
// Enable RocksDB for account history
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
// First run: setup changesets for blocks 0-5 and index them
let first_run_end: u64 = 5;
db.commit(|tx| {
for block in 0..=first_run_end {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::AccountChangeSets>(block, acc())?;
}
Ok(())
})
.unwrap();
// Run stage for first batch (first_sync = true since checkpoint is 0)
run(&db, first_run_end, None);
// Verify first run wrote to RocksDB
let rocksdb = db.factory.rocksdb_provider();
let first_result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
assert!(first_result.is_some(), "First run should write to RocksDB");
let first_blocks: Vec<u64> = first_result.unwrap().iter().collect();
assert_eq!(
first_blocks.len(),
(first_run_end + 1) as usize,
"First run should have blocks 0-5"
);
// Second run: add changesets for blocks 6-10
let second_run_end: u64 = 10;
db.commit(|tx| {
for block in (first_run_end + 1)..=second_run_end {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::AccountChangeSets>(block, acc())?;
}
Ok(())
})
.unwrap();
// Run stage for second batch (first_sync = false since checkpoint > 0)
run(&db, second_run_end, Some(first_run_end));
// Verify second run merged indices in RocksDB
let rocksdb = db.factory.rocksdb_provider();
let merged_result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
assert!(merged_result.is_some(), "Second run should have data in RocksDB");
let merged_blocks: Vec<u64> = merged_result.unwrap().iter().collect();
// Should contain all blocks from 0 to 10
let expected_blocks: Vec<u64> = (0..=second_run_end).collect();
assert_eq!(
merged_blocks, expected_blocks,
"Incremental sync should merge all indices: expected {:?}, got {:?}",
expected_blocks, merged_blocks
);
// Verify MDBX table is still empty
let mdbx_count = db.table::<tables::AccountsHistory>().unwrap().len();
assert_eq!(mdbx_count, 0, "MDBX should remain empty when RocksDB is enabled");
}
/// Test unwind removes account history from RocksDB when enabled.
#[tokio::test]
async fn unwind_removes_account_history_from_rocksdb() {
let db = TestStageDB::default();
// Enable RocksDB for account history
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
// setup changesets for blocks 0-10
db.commit(|tx| {
for block in 0..=10u64 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::AccountChangeSets>(block, acc())?;
}
Ok(())
})
.unwrap();
// Run stage to index blocks 0-10
run(&db, 10, None);
// Verify RocksDB has all blocks
let rocksdb = db.factory.rocksdb_provider();
let before_unwind = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
assert!(before_unwind.is_some(), "RocksDB should have data before unwind");
let before_blocks: Vec<u64> = before_unwind.unwrap().iter().collect();
assert_eq!(before_blocks, (0..=10).collect::<Vec<_>>(), "Should have blocks 0-10");
// Unwind to block 5
unwind(&db, 10, 5);
// Verify RocksDB only contains blocks 0-5 (blocks <= unwind_to are kept)
let rocksdb = db.factory.rocksdb_provider();
let after_unwind = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
assert!(after_unwind.is_some(), "RocksDB should still have data after partial unwind");
let after_blocks: Vec<u64> = after_unwind.unwrap().iter().collect();
assert_eq!(
after_blocks,
(0..=5).collect::<Vec<_>>(),
"After unwind to 5, should have blocks 0-5"
);
// Verify MDBX table is still empty
let mdbx_count = db.table::<tables::AccountsHistory>().unwrap().len();
assert_eq!(mdbx_count, 0, "MDBX should remain empty during RocksDB unwind");
}
/// Test unwind to genesis removes all account history from RocksDB.
#[tokio::test]
async fn unwind_to_genesis_clears_rocksdb_account_history() {
let db = TestStageDB::default();
// Enable RocksDB for account history
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
// setup changesets for blocks 0-5
db.commit(|tx| {
for block in 0..=5u64 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::AccountChangeSets>(block, acc())?;
}
Ok(())
})
.unwrap();
// Run stage
run(&db, 5, None);
// Verify RocksDB has data
let rocksdb = db.factory.rocksdb_provider();
let before = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
assert!(before.is_some(), "RocksDB should have data before unwind");
// Unwind to genesis (block 0)
unwind(&db, 5, 0);
// Verify RocksDB still has block 0 (blocks <= unwind_to are kept)
let rocksdb = db.factory.rocksdb_provider();
let after = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
assert!(after.is_some(), "RocksDB should still have block 0 after unwind to genesis");
let after_blocks: Vec<u64> = after.unwrap().iter().collect();
assert_eq!(after_blocks, vec![0], "After unwind to 0, should only have block 0");
}
/// Test unwind with no changesets in range is a no-op.
#[tokio::test]
async fn unwind_with_no_changesets_is_noop() {
let db = TestStageDB::default();
// Enable RocksDB for account history
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
// Setup changesets only for blocks 0-5 (leave gap at 6-10)
db.commit(|tx| {
for block in 0..=5u64 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::AccountChangeSets>(block, acc())?;
}
// Add block body indices for 6-10 but NO changesets
for block in 6..=10u64 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
}
Ok(())
})
.unwrap();
// Run stage to index blocks 0-10
run(&db, 10, None);
// Record state before unwind
let rocksdb = db.factory.rocksdb_provider();
let before = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
let before_blocks: Vec<u64> = before.unwrap().iter().collect();
// Unwind from 10 to 7 (range 8-10 has no changesets)
unwind(&db, 10, 7);
// Verify RocksDB data is unchanged (no changesets in unwind range)
let rocksdb = db.factory.rocksdb_provider();
let after = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
let after_blocks: Vec<u64> = after.unwrap().iter().collect();
assert_eq!(
before_blocks, after_blocks,
"Data should be unchanged when no changesets in unwind range"
);
}
/// Test unwind preserves unrelated addresses.
#[tokio::test]
async fn unwind_preserves_unrelated_addresses() {
let db = TestStageDB::default();
// Enable RocksDB for account history
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
let address_a = address!("0x0000000000000000000000000000000000000001");
let address_b = address!("0x0000000000000000000000000000000000000002");
// Setup: address_a has changes in blocks 0-10, address_b only in blocks 0-5
db.commit(|tx| {
for block in 0..=10u64 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
// address_a changes in all blocks
tx.put::<tables::AccountChangeSets>(
block,
AccountBeforeTx { address: address_a, info: None },
)?;
// address_b only changes in blocks 0-5
if block <= 5 {
tx.put::<tables::AccountChangeSets>(
block,
AccountBeforeTx { address: address_b, info: None },
)?;
}
}
Ok(())
})
.unwrap();
// Run stage
run(&db, 10, None);
// Record address_b state before unwind
let rocksdb = db.factory.rocksdb_provider();
let key_b = ShardedKey { key: address_b, highest_block_number: u64::MAX };
let before_b = rocksdb.get::<tables::AccountsHistory>(key_b.clone()).unwrap();
let before_b_blocks: Vec<u64> = before_b.unwrap().iter().collect();
// Unwind from 10 to 7 (only address_a has changes in 8-10)
unwind(&db, 10, 7);
// Verify address_b is unchanged (no changes in unwind range)
let rocksdb = db.factory.rocksdb_provider();
let after_b = rocksdb.get::<tables::AccountsHistory>(key_b).unwrap();
let after_b_blocks: Vec<u64> = after_b.unwrap().iter().collect();
assert_eq!(
before_b_blocks, after_b_blocks,
"Address B should be unchanged when it has no changes in unwind range"
);
// Verify address_a was properly unwound
let key_a = ShardedKey { key: address_a, highest_block_number: u64::MAX };
let after_a = rocksdb.get::<tables::AccountsHistory>(key_a).unwrap();
let after_a_blocks: Vec<u64> = after_a.unwrap().iter().collect();
assert_eq!(
after_a_blocks,
(0..=7).collect::<Vec<_>>(),
"Address A should have blocks 0-7 after unwind"
);
}
/// Test unwind deletes entry when all blocks are removed.
#[tokio::test]
async fn unwind_deletes_entry_when_all_blocks_removed() {
let db = TestStageDB::default();
// Enable RocksDB for account history
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
// Setup: only add changesets for blocks 5-10
db.commit(|tx| {
for block in 0..=10u64 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
if block >= 5 {
tx.put::<tables::AccountChangeSets>(block, acc())?;
}
}
Ok(())
})
.unwrap();
// Run stage
run(&db, 10, None);
// Verify RocksDB has data
let rocksdb = db.factory.rocksdb_provider();
let before = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
assert!(before.is_some(), "RocksDB should have data before unwind");
// Unwind to block 4 (removes blocks 5-10, which is ALL the data)
unwind(&db, 10, 4);
// Verify entry is deleted (no blocks left)
let rocksdb = db.factory.rocksdb_provider();
let after = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
assert!(after.is_none(), "Entry should be deleted when all blocks are removed");
}
}
}

View File

@@ -7,7 +7,12 @@ use reth_db_api::{
tables,
transaction::DbTxMut,
};
use reth_provider::{DBProvider, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter};
#[cfg(all(unix, feature = "rocksdb"))]
use reth_provider::EitherWriter;
use reth_provider::{
DBProvider, HistoryWriter, NodePrimitivesProvider, PruneCheckpointReader,
PruneCheckpointWriter, RocksDBProviderFactory, StorageSettingsCache,
};
use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
use reth_stages_api::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use std::fmt::Debug;
@@ -46,8 +51,13 @@ impl Default for IndexStorageHistoryStage {
impl<Provider> Stage<Provider> for IndexStorageHistoryStage
where
Provider:
DBProvider<Tx: DbTxMut> + PruneCheckpointWriter + HistoryWriter + PruneCheckpointReader,
Provider: DBProvider<Tx: DbTxMut>
+ PruneCheckpointWriter
+ HistoryWriter
+ PruneCheckpointReader
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider,
{
/// Return the id of the stage
fn id(&self) -> StageId {
@@ -115,17 +125,57 @@ where
&self.etl_config,
)?;
info!(target: "sync::stages::index_storage_history::exec", "Loading indices into database");
load_history_indices::<_, tables::StoragesHistory, _>(
provider,
collector,
first_sync,
|AddressStorageKey((address, storage_key)), highest_block_number| {
StorageShardedKey::new(address, storage_key, highest_block_number)
},
StorageShardedKey::decode_owned,
|key| AddressStorageKey((key.address, key.sharded_key.key)),
)?;
// Check if RocksDB is enabled for storage history
#[cfg(all(unix, feature = "rocksdb"))]
let use_rocksdb = provider.cached_storage_settings().storages_history_in_rocksdb;
#[cfg(not(all(unix, feature = "rocksdb")))]
let use_rocksdb = false;
info!(target: "sync::stages::index_storage_history::exec", ?use_rocksdb, "Loading indices into database");
if use_rocksdb {
#[cfg(all(unix, feature = "rocksdb"))]
{
// Create RocksDB batch
let rocksdb = provider.rocksdb_provider();
let mut rocksdb_batch = rocksdb.batch();
if first_sync {
super::clear_rocksdb_table::<tables::StoragesHistory>(
&rocksdb,
&mut rocksdb_batch,
)?;
}
// Create writer that routes to RocksDB
let mut writer = EitherWriter::new_storages_history(provider, rocksdb_batch)?;
// Load indices using the RocksDB path
super::load_storage_history_indices_via_writer(
&mut writer,
collector,
first_sync,
provider,
)?;
// Extract and register RocksDB batch for commit at provider level
if let Some(batch) = writer.into_raw_rocksdb_batch() {
provider.set_pending_rocksdb_batch(batch);
}
}
} else {
// Keep existing MDBX path unchanged
load_history_indices::<_, tables::StoragesHistory, _>(
provider,
collector,
first_sync,
|AddressStorageKey((address, storage_key)), highest_block_number| {
StorageShardedKey::new(address, storage_key, highest_block_number)
},
StorageShardedKey::decode_owned,
|key| AddressStorageKey((key.address, key.sharded_key.key)),
)?;
}
Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: true })
}
@@ -139,7 +189,35 @@ where
let (range, unwind_progress, _) =
input.unwind_block_range_with_threshold(self.commit_threshold);
provider.unwind_storage_history_indices_range(BlockNumberAddress::range(range))?;
// Check if RocksDB is enabled for storage history
#[cfg(all(unix, feature = "rocksdb"))]
let use_rocksdb = provider.cached_storage_settings().storages_history_in_rocksdb;
#[cfg(not(all(unix, feature = "rocksdb")))]
let use_rocksdb = false;
if use_rocksdb {
#[cfg(all(unix, feature = "rocksdb"))]
{
use reth_db_api::{cursor::DbCursorRO, transaction::DbTx};
use std::collections::BTreeSet;
// Stream changesets directly into a set of affected (address, storage_key) pairs
let mut affected_keys = BTreeSet::new();
for entry in provider
.tx_ref()
.cursor_read::<tables::StorageChangeSets>()?
.walk_range(BlockNumberAddress::range(range.clone()))?
{
let (block_address, storage_entry) = entry?;
affected_keys.insert((block_address.address(), storage_entry.key));
}
// Unwind using RocksDB
super::unwind_storage_history_via_rocksdb(provider, affected_keys, *range.start())?;
}
} else {
provider.unwind_storage_history_indices_range(BlockNumberAddress::range(range))?;
}
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) })
}
@@ -663,4 +741,456 @@ mod tests {
Ok(())
}
}
#[cfg(all(unix, feature = "rocksdb"))]
mod rocksdb_tests {
use super::*;
use reth_provider::RocksDBProviderFactory;
use reth_storage_api::StorageSettings;
#[tokio::test]
async fn execute_writes_storage_history_to_rocksdb_when_enabled() {
let db = TestStageDB::default();
// Enable RocksDB for storage history
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
// setup
partial_setup(&db);
// run
run(&db, MAX_BLOCK, None);
// Verify MDBX table is empty (data should be in RocksDB)
let mdbx_count = db.table::<tables::StoragesHistory>().unwrap().len();
assert_eq!(
mdbx_count, 0,
"MDBX StoragesHistory should be empty when RocksDB is enabled"
);
// Verify RocksDB has the data
let rocksdb = db.factory.rocksdb_provider();
let result = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
assert!(result.is_some(), "Storage history should exist in RocksDB");
// Verify the block numbers are correct
let list = result.unwrap();
let blocks: Vec<u64> = list.iter().collect();
assert!(!blocks.is_empty(), "Block list should not be empty");
}
#[tokio::test]
async fn first_sync_clears_stale_rocksdb_storage_history() {
let db = TestStageDB::default();
// Enable RocksDB for storage history
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
// Seed RocksDB with a stale entry for a different address/storage key
let rocksdb = db.factory.rocksdb_provider();
let stale_address = address!("0x0000000000000000000000000000000000000002");
let stale_storage_key =
b256!("0x0000000000000000000000000000000000000000000000000000000000000002");
let stale_key = StorageShardedKey {
address: stale_address,
sharded_key: ShardedKey { key: stale_storage_key, highest_block_number: u64::MAX },
};
rocksdb.put::<tables::StoragesHistory>(stale_key.clone(), &list(&[999])).unwrap();
// setup
partial_setup(&db);
// run (first sync)
run(&db, MAX_BLOCK, None);
// Verify stale entry is removed
let rocksdb = db.factory.rocksdb_provider();
let stale = rocksdb.get::<tables::StoragesHistory>(stale_key).unwrap();
assert!(
stale.is_none(),
"Stale RocksDB storage history should be cleared on first sync"
);
}
#[tokio::test]
async fn execute_writes_to_mdbx_when_rocksdb_disabled() {
let db = TestStageDB::default();
// Ensure RocksDB is disabled for storage history (default)
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_storages_history_in_rocksdb(false),
);
// setup
partial_setup(&db);
// run
run(&db, MAX_BLOCK, None);
// Verify MDBX table has data
let mdbx_count = db.table::<tables::StoragesHistory>().unwrap().len();
assert!(
mdbx_count > 0,
"MDBX StoragesHistory should have data when RocksDB is disabled"
);
}
/// Test incremental sync with RocksDB: run stage twice and verify indices are merged.
#[tokio::test]
async fn incremental_sync_merges_indices_in_rocksdb() {
let db = TestStageDB::default();
// Enable RocksDB for storage history
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
// First run: setup changesets for blocks 0-5 and index them
let first_run_end: u64 = 5;
db.commit(|tx| {
for block in 0..=first_run_end {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::StorageChangeSets>(
block_number_address(block),
storage(STORAGE_KEY),
)?;
}
Ok(())
})
.unwrap();
// Run stage for first batch (first_sync = true since checkpoint is 0)
run(&db, first_run_end, None);
// Verify first run wrote to RocksDB
let rocksdb = db.factory.rocksdb_provider();
let first_result = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
assert!(first_result.is_some(), "First run should write to RocksDB");
let first_blocks: Vec<u64> = first_result.unwrap().iter().collect();
assert_eq!(
first_blocks.len(),
(first_run_end + 1) as usize,
"First run should have blocks 0-5"
);
// Second run: add changesets for blocks 6-10
let second_run_end: u64 = 10;
db.commit(|tx| {
for block in (first_run_end + 1)..=second_run_end {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::StorageChangeSets>(
block_number_address(block),
storage(STORAGE_KEY),
)?;
}
Ok(())
})
.unwrap();
// Run stage for second batch (first_sync = false since checkpoint > 0)
run(&db, second_run_end, Some(first_run_end));
// Verify second run merged indices in RocksDB
let rocksdb = db.factory.rocksdb_provider();
let merged_result = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
assert!(merged_result.is_some(), "Second run should have data in RocksDB");
let merged_blocks: Vec<u64> = merged_result.unwrap().iter().collect();
// Should contain all blocks from 0 to 10
let expected_blocks: Vec<u64> = (0..=second_run_end).collect();
assert_eq!(
merged_blocks, expected_blocks,
"Incremental sync should merge all indices: expected {:?}, got {:?}",
expected_blocks, merged_blocks
);
// Verify MDBX table is still empty
let mdbx_count = db.table::<tables::StoragesHistory>().unwrap().len();
assert_eq!(mdbx_count, 0, "MDBX should remain empty when RocksDB is enabled");
}
/// Test unwind removes storage history from RocksDB when enabled.
#[tokio::test]
async fn unwind_removes_storage_history_from_rocksdb() {
let db = TestStageDB::default();
// Enable RocksDB for storage history
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
// setup changesets for blocks 0-10
db.commit(|tx| {
for block in 0..=10u64 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::StorageChangeSets>(
block_number_address(block),
storage(STORAGE_KEY),
)?;
}
Ok(())
})
.unwrap();
// Run stage to index blocks 0-10
run(&db, 10, None);
// Verify RocksDB has all blocks
let rocksdb = db.factory.rocksdb_provider();
let before_unwind = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
assert!(before_unwind.is_some(), "RocksDB should have data before unwind");
let before_blocks: Vec<u64> = before_unwind.unwrap().iter().collect();
assert_eq!(before_blocks, (0..=10).collect::<Vec<_>>(), "Should have blocks 0-10");
// Unwind to block 5
unwind(&db, 10, 5);
// Verify RocksDB only contains blocks 0-5 (blocks <= unwind_to are kept)
let rocksdb = db.factory.rocksdb_provider();
let after_unwind = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
assert!(after_unwind.is_some(), "RocksDB should still have data after partial unwind");
let after_blocks: Vec<u64> = after_unwind.unwrap().iter().collect();
assert_eq!(
after_blocks,
(0..=5).collect::<Vec<_>>(),
"After unwind to 5, should have blocks 0-5"
);
// Verify MDBX table is still empty
let mdbx_count = db.table::<tables::StoragesHistory>().unwrap().len();
assert_eq!(mdbx_count, 0, "MDBX should remain empty during RocksDB unwind");
}
/// Test unwind to genesis removes all storage history from RocksDB.
#[tokio::test]
async fn unwind_to_genesis_clears_rocksdb_storage_history() {
let db = TestStageDB::default();
// Enable RocksDB for storage history
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
// setup changesets for blocks 0-5
db.commit(|tx| {
for block in 0..=5u64 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::StorageChangeSets>(
block_number_address(block),
storage(STORAGE_KEY),
)?;
}
Ok(())
})
.unwrap();
// Run stage
run(&db, 5, None);
// Verify RocksDB has data
let rocksdb = db.factory.rocksdb_provider();
let before = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
assert!(before.is_some(), "RocksDB should have data before unwind");
// Unwind to genesis (block 0)
unwind(&db, 5, 0);
// Verify RocksDB still has block 0 (blocks <= unwind_to are kept)
let rocksdb = db.factory.rocksdb_provider();
let after = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
assert!(after.is_some(), "RocksDB should still have block 0 after unwind to genesis");
let after_blocks: Vec<u64> = after.unwrap().iter().collect();
assert_eq!(after_blocks, vec![0], "After unwind to 0, should only have block 0");
}
/// Test unwind with no changesets in range is a no-op.
#[tokio::test]
async fn unwind_with_no_changesets_is_noop() {
let db = TestStageDB::default();
// Enable RocksDB for storage history
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
// Setup changesets only for blocks 0-5 (leave gap at 6-10)
db.commit(|tx| {
for block in 0..=5u64 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::StorageChangeSets>(
block_number_address(block),
storage(STORAGE_KEY),
)?;
}
// Add block body indices for 6-10 but NO changesets
for block in 6..=10u64 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
}
Ok(())
})
.unwrap();
// Run stage to index blocks 0-10
run(&db, 10, None);
// Record state before unwind
let rocksdb = db.factory.rocksdb_provider();
let before = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
let before_blocks: Vec<u64> = before.unwrap().iter().collect();
// Unwind from 10 to 7 (range 8-10 has no changesets)
unwind(&db, 10, 7);
// Verify RocksDB data is unchanged (no changesets in unwind range)
let rocksdb = db.factory.rocksdb_provider();
let after = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
let after_blocks: Vec<u64> = after.unwrap().iter().collect();
assert_eq!(
before_blocks, after_blocks,
"Data should be unchanged when no changesets in unwind range"
);
}
/// Test unwind preserves unrelated storage keys.
#[tokio::test]
async fn unwind_preserves_unrelated_storage_keys() {
let db = TestStageDB::default();
// Enable RocksDB for storage history
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
let key_1 = B256::with_last_byte(1);
let key_2 = B256::with_last_byte(2);
// Setup: key_1 has changes in blocks 0-10, key_2 only in blocks 0-5
db.commit(|tx| {
for block in 0..=10u64 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
// key_1 changes in all blocks
tx.put::<tables::StorageChangeSets>(
block_number_address(block),
storage(key_1),
)?;
// key_2 only changes in blocks 0-5
if block <= 5 {
tx.put::<tables::StorageChangeSets>(
block_number_address(block),
storage(key_2),
)?;
}
}
Ok(())
})
.unwrap();
// Run stage
run(&db, 10, None);
// Record key_2 state before unwind
let rocksdb = db.factory.rocksdb_provider();
let shard_key_2 = StorageShardedKey {
address: ADDRESS,
sharded_key: ShardedKey { key: key_2, highest_block_number: u64::MAX },
};
let before_2 = rocksdb.get::<tables::StoragesHistory>(shard_key_2.clone()).unwrap();
let before_2_blocks: Vec<u64> = before_2.unwrap().iter().collect();
// Unwind from 10 to 7 (only key_1 has changes in 8-10)
unwind(&db, 10, 7);
// Verify key_2 is unchanged (no changes in unwind range)
let rocksdb = db.factory.rocksdb_provider();
let after_2 = rocksdb.get::<tables::StoragesHistory>(shard_key_2).unwrap();
let after_2_blocks: Vec<u64> = after_2.unwrap().iter().collect();
assert_eq!(
before_2_blocks, after_2_blocks,
"Key 2 should be unchanged when it has no changes in unwind range"
);
// Verify key_1 was properly unwound
let shard_key_1 = StorageShardedKey {
address: ADDRESS,
sharded_key: ShardedKey { key: key_1, highest_block_number: u64::MAX },
};
let after_1 = rocksdb.get::<tables::StoragesHistory>(shard_key_1).unwrap();
let after_1_blocks: Vec<u64> = after_1.unwrap().iter().collect();
assert_eq!(
after_1_blocks,
(0..=7).collect::<Vec<_>>(),
"Key 1 should have blocks 0-7 after unwind"
);
}
/// Test unwind deletes entry when all blocks are removed.
#[tokio::test]
async fn unwind_deletes_entry_when_all_blocks_removed() {
let db = TestStageDB::default();
// Enable RocksDB for storage history
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
// Setup: only add changesets for blocks 5-10
db.commit(|tx| {
for block in 0..=10u64 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
if block >= 5 {
tx.put::<tables::StorageChangeSets>(
block_number_address(block),
storage(STORAGE_KEY),
)?;
}
}
Ok(())
})
.unwrap();
// Run stage
run(&db, 10, None);
// Verify RocksDB has data
let rocksdb = db.factory.rocksdb_provider();
let before = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
assert!(before.is_some(), "RocksDB should have data before unwind");
// Unwind to block 4 (removes blocks 5-10, which is ALL the data)
unwind(&db, 10, 4);
// Verify entry is deleted (no blocks left)
let rocksdb = db.factory.rocksdb_provider();
let after = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
assert!(after.is_none(), "Entry should be deleted when all blocks are removed");
}
}
}

View File

@@ -9,6 +9,10 @@ use reth_db_api::{
BlockNumberList, DatabaseError,
};
use reth_etl::Collector;
#[cfg(all(unix, feature = "rocksdb"))]
use reth_provider::providers::{RocksDBBatch, RocksDBProvider};
#[cfg(all(unix, feature = "rocksdb"))]
use reth_provider::EitherWriter;
use reth_provider::{
providers::StaticFileProvider, BlockReader, DBProvider, ProviderError,
StaticFileProviderFactory,
@@ -247,6 +251,331 @@ impl LoadMode {
}
}
/// Deletes all entries in a `RocksDB` table by enqueuing delete operations in the provided batch.
#[cfg(all(unix, feature = "rocksdb"))]
pub(crate) fn clear_rocksdb_table<T: Table>(
rocksdb: &RocksDBProvider,
batch: &mut RocksDBBatch<'_>,
) -> Result<(), StageError> {
for entry in rocksdb.iter::<T>()? {
let (key, _value) = entry?;
batch.delete::<T>(key)?;
}
Ok(())
}
/// Load storage history indices through an [`EitherWriter`] which routes writes to either
/// database or `RocksDB`.
///
/// This is similar to [`load_history_indices`] but uses `EitherWriter` for flexible storage
/// backend routing. The sharding logic is identical: indices are grouped by partial keys,
/// shards are created when reaching `NUM_OF_INDICES_IN_SHARD`, and the last shard uses
/// `u64::MAX` as the highest block number sentinel.
///
/// Note: For `RocksDB`, when `append_only == false`, this function reads existing shards
/// from `RocksDB` and merges them with new indices, matching the MDBX behavior.
#[cfg(all(unix, feature = "rocksdb"))]
pub(crate) fn load_storage_history_indices_via_writer<CURSOR, N, Provider>(
writer: &mut EitherWriter<'_, CURSOR, N>,
mut collector: reth_etl::Collector<
reth_db_api::models::storage_sharded_key::StorageShardedKey,
BlockNumberList,
>,
append_only: bool,
rocksdb_provider: &Provider,
) -> Result<(), StageError>
where
CURSOR: DbCursorRW<reth_db_api::tables::StoragesHistory>
+ DbCursorRO<reth_db_api::tables::StoragesHistory>,
N: reth_primitives_traits::NodePrimitives,
Provider: reth_provider::RocksDBProviderFactory,
{
use reth_db_api::{models::storage_sharded_key::StorageShardedKey, table::Decode};
type PartialKey = (alloy_primitives::Address, alloy_primitives::B256);
let mut current_partial = PartialKey::default();
let mut current_list = Vec::<u64>::new();
let total_entries = collector.len();
let interval = (total_entries / 10).max(1);
for (index, element) in collector.iter()?.enumerate() {
let (k, v) = element?;
let sharded_key = StorageShardedKey::decode_owned(k)?;
let new_list = BlockNumberList::decompress_owned(v)?;
if index > 0 && index.is_multiple_of(interval) && total_entries > 10 {
info!(target: "sync::stages::index_storage_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices via EitherWriter");
}
let partial_key: PartialKey = (sharded_key.address, sharded_key.sharded_key.key);
if current_partial != partial_key {
flush_storage_shards(writer, current_partial, &mut current_list, append_only, true)?;
current_partial = partial_key;
current_list.clear();
// If it's not the first sync, there might be an existing shard already in RocksDB,
// so we need to merge it with the one coming from the collector
if !append_only {
let rocksdb = rocksdb_provider.rocksdb_provider();
let key = StorageShardedKey::new(partial_key.0, partial_key.1, u64::MAX);
if let Some(existing_list) =
rocksdb.get::<reth_db_api::tables::StoragesHistory>(key)?
{
current_list.extend(existing_list.iter());
}
}
}
current_list.extend(new_list.iter());
flush_storage_shards(writer, current_partial, &mut current_list, append_only, false)?;
}
flush_storage_shards(writer, current_partial, &mut current_list, append_only, true)?;
Ok(())
}
#[cfg(all(unix, feature = "rocksdb"))]
fn flush_storage_shards<CURSOR, N>(
writer: &mut EitherWriter<'_, CURSOR, N>,
partial_key: (alloy_primitives::Address, alloy_primitives::B256),
list: &mut Vec<BlockNumber>,
append_only: bool,
flush: bool,
) -> Result<(), StageError>
where
CURSOR: DbCursorRW<reth_db_api::tables::StoragesHistory>
+ DbCursorRO<reth_db_api::tables::StoragesHistory>,
N: reth_primitives_traits::NodePrimitives,
{
use reth_db_api::models::storage_sharded_key::StorageShardedKey;
if list.len() > NUM_OF_INDICES_IN_SHARD || flush {
let chunks =
list.chunks(NUM_OF_INDICES_IN_SHARD).map(|c| c.to_vec()).collect::<Vec<Vec<u64>>>();
let mut iter = chunks.into_iter().peekable();
while let Some(chunk) = iter.next() {
let mut highest = *chunk.last().expect("at least one index");
if !flush && iter.peek().is_none() {
*list = chunk;
} else {
if iter.peek().is_none() {
highest = u64::MAX;
}
let key = StorageShardedKey::new(partial_key.0, partial_key.1, highest);
let value = BlockNumberList::new_pre_sorted(chunk);
writer.put_storage_history(key, &value, append_only)?;
}
}
}
Ok(())
}
/// Load account history indices through an [`EitherWriter`] which routes writes to either
/// database or `RocksDB`.
///
/// Note: For `RocksDB`, when `append_only == false`, this function reads existing shards
/// from `RocksDB` and merges them with new indices, matching the MDBX behavior.
#[cfg(all(unix, feature = "rocksdb"))]
pub(crate) fn load_account_history_indices_via_writer<CURSOR, N, Provider>(
writer: &mut EitherWriter<'_, CURSOR, N>,
mut collector: reth_etl::Collector<
reth_db_api::models::ShardedKey<alloy_primitives::Address>,
BlockNumberList,
>,
append_only: bool,
rocksdb_provider: &Provider,
) -> Result<(), StageError>
where
CURSOR: DbCursorRW<reth_db_api::tables::AccountsHistory>
+ DbCursorRO<reth_db_api::tables::AccountsHistory>,
N: reth_primitives_traits::NodePrimitives,
Provider: reth_provider::RocksDBProviderFactory,
{
use reth_db_api::{models::ShardedKey, table::Decode};
let mut current_partial = alloy_primitives::Address::default();
let mut current_list = Vec::<u64>::new();
let total_entries = collector.len();
let interval = (total_entries / 10).max(1);
for (index, element) in collector.iter()?.enumerate() {
let (k, v) = element?;
let sharded_key = ShardedKey::<alloy_primitives::Address>::decode_owned(k)?;
let new_list = BlockNumberList::decompress_owned(v)?;
if index > 0 && index.is_multiple_of(interval) && total_entries > 10 {
info!(target: "sync::stages::index_account_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices via EitherWriter");
}
let partial_key = sharded_key.key;
if current_partial != partial_key {
flush_account_shards(writer, current_partial, &mut current_list, append_only, true)?;
current_partial = partial_key;
current_list.clear();
// If it's not the first sync, there might be an existing shard already in RocksDB,
// so we need to merge it with the one coming from the collector
if !append_only {
let rocksdb = rocksdb_provider.rocksdb_provider();
let key = ShardedKey::new(partial_key, u64::MAX);
if let Some(existing_list) =
rocksdb.get::<reth_db_api::tables::AccountsHistory>(key)?
{
current_list.extend(existing_list.iter());
}
}
}
current_list.extend(new_list.iter());
flush_account_shards(writer, current_partial, &mut current_list, append_only, false)?;
}
flush_account_shards(writer, current_partial, &mut current_list, append_only, true)?;
Ok(())
}
#[cfg(all(unix, feature = "rocksdb"))]
fn flush_account_shards<CURSOR, N>(
writer: &mut EitherWriter<'_, CURSOR, N>,
partial_key: alloy_primitives::Address,
list: &mut Vec<BlockNumber>,
append_only: bool,
flush: bool,
) -> Result<(), StageError>
where
CURSOR: DbCursorRW<reth_db_api::tables::AccountsHistory>
+ DbCursorRO<reth_db_api::tables::AccountsHistory>,
N: reth_primitives_traits::NodePrimitives,
{
use reth_db_api::models::ShardedKey;
if list.len() > NUM_OF_INDICES_IN_SHARD || flush {
let chunks =
list.chunks(NUM_OF_INDICES_IN_SHARD).map(|c| c.to_vec()).collect::<Vec<Vec<u64>>>();
let mut iter = chunks.into_iter().peekable();
while let Some(chunk) = iter.next() {
let mut highest = *chunk.last().expect("at least one index");
if !flush && iter.peek().is_none() {
*list = chunk;
} else {
if iter.peek().is_none() {
highest = u64::MAX;
}
let key = ShardedKey::new(partial_key, highest);
let value = BlockNumberList::new_pre_sorted(chunk);
writer.put_account_history(key, &value, append_only)?;
}
}
}
Ok(())
}
/// Generic helper to unwind history indices using `RocksDB`.
///
/// For each affected partial key:
/// 1. Reads the existing shard from `RocksDB` (the one with `u64::MAX` sentinel)
/// 2. Filters out block numbers >= `first_block_to_remove`
/// 3. If the result is empty, deletes the entry
/// 4. Otherwise, writes back the truncated shard
///
/// Note: This only handles the sentinel shard (`u64::MAX`). This is correct for typical
/// unwind operations where recently-added blocks are always in the sentinel shard.
#[cfg(all(unix, feature = "rocksdb"))]
fn unwind_history_via_rocksdb_generic<Provider, T, PK, K>(
provider: &Provider,
affected_keys: impl IntoIterator<Item = PK>,
first_block_to_remove: BlockNumber,
mk_sentinel_key: impl Fn(PK) -> K,
) -> Result<usize, StageError>
where
Provider: DBProvider + reth_provider::RocksDBProviderFactory,
T: reth_db_api::table::Table<Key = K, Value = BlockNumberList>,
K: Clone,
{
let rocksdb = provider.rocksdb_provider();
let mut batch = rocksdb.batch();
let mut count = 0;
for pk in affected_keys {
let key = mk_sentinel_key(pk);
if let Some(existing_list) = rocksdb.get::<T>(key.clone())? {
// Keep blocks < first_block_to_remove (matches MDBX semantics)
let filtered: Vec<u64> =
existing_list.iter().filter(|&block| block < first_block_to_remove).collect();
if filtered.is_empty() {
batch.delete::<T>(key)?;
} else {
let new_list = BlockNumberList::new_pre_sorted(filtered);
batch.put::<T>(key, &new_list)?;
}
count += 1;
}
}
provider.set_pending_rocksdb_batch(batch.into_inner());
Ok(count)
}
/// Unwind storage history indices using `RocksDB`.
///
/// Takes a set of affected (address, `storage_key`) pairs and removes all block numbers
/// >= `first_block_to_remove` from the history indices.
#[cfg(all(unix, feature = "rocksdb"))]
pub(crate) fn unwind_storage_history_via_rocksdb<Provider>(
provider: &Provider,
affected_keys: std::collections::BTreeSet<(alloy_primitives::Address, alloy_primitives::B256)>,
first_block_to_remove: BlockNumber,
) -> Result<usize, StageError>
where
Provider: DBProvider + reth_provider::RocksDBProviderFactory,
{
use reth_db_api::models::storage_sharded_key::StorageShardedKey;
unwind_history_via_rocksdb_generic::<_, reth_db_api::tables::StoragesHistory, _, _>(
provider,
affected_keys,
first_block_to_remove,
|(address, storage_key)| StorageShardedKey::new(address, storage_key, u64::MAX),
)
}
/// Unwind account history indices using `RocksDB`.
///
/// Takes a set of affected addresses and removes all block numbers
/// >= `first_block_to_remove` from the history indices.
#[cfg(all(unix, feature = "rocksdb"))]
pub(crate) fn unwind_account_history_via_rocksdb<Provider>(
provider: &Provider,
affected_addresses: std::collections::BTreeSet<alloy_primitives::Address>,
first_block_to_remove: BlockNumber,
) -> Result<usize, StageError>
where
Provider: DBProvider + reth_provider::RocksDBProviderFactory,
{
use reth_db_api::models::ShardedKey;
unwind_history_via_rocksdb_generic::<_, reth_db_api::tables::AccountsHistory, _, _>(
provider,
affected_addresses,
first_block_to_remove,
|address| ShardedKey::new(address, u64::MAX),
)
}
/// Called when database is ahead of static files. Attempts to find the first block we are missing
/// transactions for.
pub(crate) fn missing_static_data_error<Provider>(

View File

@@ -364,13 +364,24 @@ where
CURSOR: DbCursorRW<tables::StoragesHistory> + DbCursorRO<tables::StoragesHistory>,
{
/// Puts a storage history entry.
///
/// When `append_only` is true, uses `cursor.append()` which is significantly faster
/// but requires entries to be inserted in order and the table to be empty.
/// When false, uses `cursor.upsert()` which handles arbitrary insertion order.
pub fn put_storage_history(
&mut self,
key: StorageShardedKey,
value: &BlockNumberList,
append_only: bool,
) -> ProviderResult<()> {
match self {
Self::Database(cursor) => Ok(cursor.upsert(key, value)?),
Self::Database(cursor) => {
if append_only {
Ok(cursor.append(key, value)?)
} else {
Ok(cursor.upsert(key, value)?)
}
}
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => batch.put::<tables::StoragesHistory>(key, value),
@@ -398,13 +409,24 @@ where
CURSOR: DbCursorRW<tables::AccountsHistory> + DbCursorRO<tables::AccountsHistory>,
{
/// Puts an account history entry.
///
/// When `append_only` is true, uses `cursor.append()` which is significantly faster
/// but requires entries to be inserted in order and the table to be empty.
/// When false, uses `cursor.upsert()` which handles arbitrary insertion order.
pub fn put_account_history(
&mut self,
key: ShardedKey<Address>,
value: &BlockNumberList,
append_only: bool,
) -> ProviderResult<()> {
match self {
Self::Database(cursor) => Ok(cursor.upsert(key, value)?),
Self::Database(cursor) => {
if append_only {
Ok(cursor.append(key, value)?)
} else {
Ok(cursor.upsert(key, value)?)
}
}
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => batch.put::<tables::AccountsHistory>(key, value),