mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-07 22:43:56 -05:00
test: simplify RocksDB history indexing tests
Consolidate 8 tests per stage into 3 essential tests each: - execute_writes_to_rocksdb: verifies basic execute routing - incremental_sync_merges_indices: critical merge behavior - unwind_removes_indices: verifies unwind routing Removed redundant edge case tests that duplicate MDBX test coverage. Added helper functions and module-level docstrings for clarity. Reduces test code by ~757 lines while maintaining coverage of critical RocksDB-specific behaviors. Amp-Thread-ID: https://ampcode.com/threads/T-019b8ca2-143e-73fa-93d3-c806e9da7b92 Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
@@ -714,112 +714,25 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
/// RocksDB-specific tests for account history indexing.
|
||||
///
|
||||
/// These tests verify that when `account_history_in_rocksdb` is enabled:
|
||||
/// - Execute writes indices to RocksDB instead of MDBX
|
||||
/// - Incremental syncs properly merge with existing RocksDB data
|
||||
/// - Unwind correctly removes indices from RocksDB
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
mod rocksdb_tests {
|
||||
use super::*;
|
||||
use reth_provider::RocksDBProviderFactory;
|
||||
use reth_storage_api::StorageSettings;
|
||||
|
||||
#[tokio::test]
|
||||
async fn execute_writes_account_history_to_rocksdb_when_enabled() {
|
||||
let db = TestStageDB::default();
|
||||
|
||||
// Enable RocksDB for account history
|
||||
/// Helper to setup RocksDB-enabled test environment with changesets.
|
||||
fn setup_with_changesets(db: &TestStageDB, block_range: std::ops::RangeInclusive<u64>) {
|
||||
db.factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_account_history_in_rocksdb(true),
|
||||
);
|
||||
|
||||
// setup
|
||||
partial_setup(&db);
|
||||
|
||||
// run
|
||||
run(&db, MAX_BLOCK, None);
|
||||
|
||||
// Verify MDBX table is empty (data should be in RocksDB)
|
||||
let mdbx_count = db.table::<tables::AccountsHistory>().unwrap().len();
|
||||
assert_eq!(
|
||||
mdbx_count, 0,
|
||||
"MDBX AccountsHistory should be empty when RocksDB is enabled"
|
||||
);
|
||||
|
||||
// Verify RocksDB has the data
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
|
||||
assert!(result.is_some(), "Account history should exist in RocksDB");
|
||||
|
||||
// Verify the block numbers are correct
|
||||
let list = result.unwrap();
|
||||
let blocks: Vec<u64> = list.iter().collect();
|
||||
assert!(!blocks.is_empty(), "Block list should not be empty");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn first_sync_clears_stale_rocksdb_account_history() {
|
||||
let db = TestStageDB::default();
|
||||
|
||||
// Enable RocksDB for account history
|
||||
db.factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_account_history_in_rocksdb(true),
|
||||
);
|
||||
|
||||
// Seed RocksDB with a stale entry for a different address
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let stale_address = address!("0x0000000000000000000000000000000000000002");
|
||||
let stale_key = ShardedKey { key: stale_address, highest_block_number: u64::MAX };
|
||||
rocksdb.put::<tables::AccountsHistory>(stale_key.clone(), &list(&[999])).unwrap();
|
||||
|
||||
// setup
|
||||
partial_setup(&db);
|
||||
|
||||
// run (first sync)
|
||||
run(&db, MAX_BLOCK, None);
|
||||
|
||||
// Verify stale entry is removed
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let stale = rocksdb.get::<tables::AccountsHistory>(stale_key).unwrap();
|
||||
assert!(
|
||||
stale.is_none(),
|
||||
"Stale RocksDB account history should be cleared on first sync"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn execute_writes_to_mdbx_when_rocksdb_disabled() {
|
||||
let db = TestStageDB::default();
|
||||
|
||||
// Ensure RocksDB is disabled for account history (default)
|
||||
db.factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_account_history_in_rocksdb(false),
|
||||
);
|
||||
|
||||
// setup
|
||||
partial_setup(&db);
|
||||
|
||||
// run
|
||||
run(&db, MAX_BLOCK, None);
|
||||
|
||||
// Verify MDBX table has data
|
||||
let mdbx_count = db.table::<tables::AccountsHistory>().unwrap().len();
|
||||
assert!(
|
||||
mdbx_count > 0,
|
||||
"MDBX AccountsHistory should have data when RocksDB is disabled"
|
||||
);
|
||||
}
|
||||
|
||||
/// Test incremental sync with RocksDB: run stage twice and verify indices are merged.
|
||||
#[tokio::test]
|
||||
async fn incremental_sync_merges_indices_in_rocksdb() {
|
||||
let db = TestStageDB::default();
|
||||
|
||||
// Enable RocksDB for account history
|
||||
db.factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_account_history_in_rocksdb(true),
|
||||
);
|
||||
|
||||
// First run: setup changesets for blocks 0-5 and index them
|
||||
let first_run_end: u64 = 5;
|
||||
db.commit(|tx| {
|
||||
for block in 0..=first_run_end {
|
||||
for block in block_range {
|
||||
tx.put::<tables::BlockBodyIndices>(
|
||||
block,
|
||||
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
|
||||
@@ -829,311 +742,75 @@ mod tests {
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
// Run stage for first batch (first_sync = true since checkpoint is 0)
|
||||
run(&db, first_run_end, None);
|
||||
|
||||
// Verify first run wrote to RocksDB
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let first_result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
|
||||
assert!(first_result.is_some(), "First run should write to RocksDB");
|
||||
let first_blocks: Vec<u64> = first_result.unwrap().iter().collect();
|
||||
assert_eq!(
|
||||
first_blocks.len(),
|
||||
(first_run_end + 1) as usize,
|
||||
"First run should have blocks 0-5"
|
||||
);
|
||||
|
||||
// Second run: add changesets for blocks 6-10
|
||||
let second_run_end: u64 = 10;
|
||||
db.commit(|tx| {
|
||||
for block in (first_run_end + 1)..=second_run_end {
|
||||
tx.put::<tables::BlockBodyIndices>(
|
||||
block,
|
||||
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
|
||||
)?;
|
||||
tx.put::<tables::AccountChangeSets>(block, acc())?;
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
// Run stage for second batch (first_sync = false since checkpoint > 0)
|
||||
run(&db, second_run_end, Some(first_run_end));
|
||||
|
||||
// Verify second run merged indices in RocksDB
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let merged_result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
|
||||
assert!(merged_result.is_some(), "Second run should have data in RocksDB");
|
||||
let merged_blocks: Vec<u64> = merged_result.unwrap().iter().collect();
|
||||
|
||||
// Should contain all blocks from 0 to 10
|
||||
let expected_blocks: Vec<u64> = (0..=second_run_end).collect();
|
||||
assert_eq!(
|
||||
merged_blocks, expected_blocks,
|
||||
"Incremental sync should merge all indices: expected {:?}, got {:?}",
|
||||
expected_blocks, merged_blocks
|
||||
);
|
||||
|
||||
// Verify MDBX table is still empty
|
||||
let mdbx_count = db.table::<tables::AccountsHistory>().unwrap().len();
|
||||
assert_eq!(mdbx_count, 0, "MDBX should remain empty when RocksDB is enabled");
|
||||
}
|
||||
|
||||
/// Test unwind removes account history from RocksDB when enabled.
|
||||
/// Verifies execute writes to RocksDB (not MDBX) when enabled.
|
||||
#[tokio::test]
|
||||
async fn unwind_removes_account_history_from_rocksdb() {
|
||||
async fn execute_writes_to_rocksdb() {
|
||||
let db = TestStageDB::default();
|
||||
setup_with_changesets(&db, 0..=5);
|
||||
|
||||
// Enable RocksDB for account history
|
||||
db.factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_account_history_in_rocksdb(true),
|
||||
);
|
||||
|
||||
// setup changesets for blocks 0-10
|
||||
db.commit(|tx| {
|
||||
for block in 0..=10u64 {
|
||||
tx.put::<tables::BlockBodyIndices>(
|
||||
block,
|
||||
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
|
||||
)?;
|
||||
tx.put::<tables::AccountChangeSets>(block, acc())?;
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
// Run stage to index blocks 0-10
|
||||
run(&db, 10, None);
|
||||
|
||||
// Verify RocksDB has all blocks
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let before_unwind = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
|
||||
assert!(before_unwind.is_some(), "RocksDB should have data before unwind");
|
||||
let before_blocks: Vec<u64> = before_unwind.unwrap().iter().collect();
|
||||
assert_eq!(before_blocks, (0..=10).collect::<Vec<_>>(), "Should have blocks 0-10");
|
||||
|
||||
// Unwind to block 5
|
||||
unwind(&db, 10, 5);
|
||||
|
||||
// Verify RocksDB only contains blocks 0-5 (blocks <= unwind_to are kept)
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let after_unwind = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
|
||||
assert!(after_unwind.is_some(), "RocksDB should still have data after partial unwind");
|
||||
let after_blocks: Vec<u64> = after_unwind.unwrap().iter().collect();
|
||||
assert_eq!(
|
||||
after_blocks,
|
||||
(0..=5).collect::<Vec<_>>(),
|
||||
"After unwind to 5, should have blocks 0-5"
|
||||
);
|
||||
|
||||
// Verify MDBX table is still empty
|
||||
let mdbx_count = db.table::<tables::AccountsHistory>().unwrap().len();
|
||||
assert_eq!(mdbx_count, 0, "MDBX should remain empty during RocksDB unwind");
|
||||
}
|
||||
|
||||
/// Test unwind to genesis removes all account history from RocksDB.
|
||||
#[tokio::test]
|
||||
async fn unwind_to_genesis_clears_rocksdb_account_history() {
|
||||
let db = TestStageDB::default();
|
||||
|
||||
// Enable RocksDB for account history
|
||||
db.factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_account_history_in_rocksdb(true),
|
||||
);
|
||||
|
||||
// setup changesets for blocks 0-5
|
||||
db.commit(|tx| {
|
||||
for block in 0..=5u64 {
|
||||
tx.put::<tables::BlockBodyIndices>(
|
||||
block,
|
||||
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
|
||||
)?;
|
||||
tx.put::<tables::AccountChangeSets>(block, acc())?;
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
// Run stage
|
||||
run(&db, 5, None);
|
||||
|
||||
// Verify RocksDB has data
|
||||
// MDBX should be empty, RocksDB should have data
|
||||
assert_eq!(db.table::<tables::AccountsHistory>().unwrap().len(), 0);
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let before = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
|
||||
assert!(before.is_some(), "RocksDB should have data before unwind");
|
||||
|
||||
// Unwind to genesis (block 0)
|
||||
unwind(&db, 5, 0);
|
||||
|
||||
// Verify RocksDB still has block 0 (blocks <= unwind_to are kept)
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let after = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
|
||||
assert!(after.is_some(), "RocksDB should still have block 0 after unwind to genesis");
|
||||
let after_blocks: Vec<u64> = after.unwrap().iter().collect();
|
||||
assert_eq!(after_blocks, vec![0], "After unwind to 0, should only have block 0");
|
||||
assert!(rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap().is_some());
|
||||
}
|
||||
|
||||
/// Test unwind with no changesets in range is a no-op.
|
||||
/// Verifies incremental sync merges new indices with existing RocksDB data.
|
||||
/// This is critical: without proper merging, we'd lose previously indexed blocks.
|
||||
#[tokio::test]
|
||||
async fn unwind_with_no_changesets_is_noop() {
|
||||
async fn incremental_sync_merges_indices() {
|
||||
let db = TestStageDB::default();
|
||||
setup_with_changesets(&db, 0..=5);
|
||||
|
||||
// Enable RocksDB for account history
|
||||
db.factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_account_history_in_rocksdb(true),
|
||||
);
|
||||
// First sync: blocks 0-5
|
||||
run(&db, 5, None);
|
||||
|
||||
// Setup changesets only for blocks 0-5 (leave gap at 6-10)
|
||||
// Add more changesets and run incremental sync
|
||||
db.commit(|tx| {
|
||||
for block in 0..=5u64 {
|
||||
tx.put::<tables::BlockBodyIndices>(
|
||||
block,
|
||||
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
|
||||
)?;
|
||||
tx.put::<tables::AccountChangeSets>(block, acc())?;
|
||||
}
|
||||
// Add block body indices for 6-10 but NO changesets
|
||||
for block in 6..=10u64 {
|
||||
tx.put::<tables::BlockBodyIndices>(
|
||||
block,
|
||||
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
|
||||
)?;
|
||||
tx.put::<tables::AccountChangeSets>(block, acc())?;
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
run(&db, 10, Some(5));
|
||||
|
||||
// Run stage to index blocks 0-10
|
||||
run(&db, 10, None);
|
||||
|
||||
// Record state before unwind
|
||||
// Should have all blocks 0-10 merged
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let before = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
|
||||
let before_blocks: Vec<u64> = before.unwrap().iter().collect();
|
||||
|
||||
// Unwind from 10 to 7 (range 8-10 has no changesets)
|
||||
unwind(&db, 10, 7);
|
||||
|
||||
// Verify RocksDB data is unchanged (no changesets in unwind range)
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let after = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
|
||||
let after_blocks: Vec<u64> = after.unwrap().iter().collect();
|
||||
assert_eq!(
|
||||
before_blocks, after_blocks,
|
||||
"Data should be unchanged when no changesets in unwind range"
|
||||
);
|
||||
let blocks: Vec<u64> = rocksdb
|
||||
.get::<tables::AccountsHistory>(shard(u64::MAX))
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.collect();
|
||||
assert_eq!(blocks, (0..=10).collect::<Vec<_>>());
|
||||
}
|
||||
|
||||
/// Test unwind preserves unrelated addresses.
|
||||
/// Verifies unwind removes indices >= unwind_to from RocksDB.
|
||||
#[tokio::test]
|
||||
async fn unwind_preserves_unrelated_addresses() {
|
||||
async fn unwind_removes_indices() {
|
||||
let db = TestStageDB::default();
|
||||
setup_with_changesets(&db, 0..=10);
|
||||
|
||||
// Enable RocksDB for account history
|
||||
db.factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_account_history_in_rocksdb(true),
|
||||
);
|
||||
|
||||
let address_a = address!("0x0000000000000000000000000000000000000001");
|
||||
let address_b = address!("0x0000000000000000000000000000000000000002");
|
||||
|
||||
// Setup: address_a has changes in blocks 0-10, address_b only in blocks 0-5
|
||||
db.commit(|tx| {
|
||||
for block in 0..=10u64 {
|
||||
tx.put::<tables::BlockBodyIndices>(
|
||||
block,
|
||||
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
|
||||
)?;
|
||||
// address_a changes in all blocks
|
||||
tx.put::<tables::AccountChangeSets>(
|
||||
block,
|
||||
AccountBeforeTx { address: address_a, info: None },
|
||||
)?;
|
||||
// address_b only changes in blocks 0-5
|
||||
if block <= 5 {
|
||||
tx.put::<tables::AccountChangeSets>(
|
||||
block,
|
||||
AccountBeforeTx { address: address_b, info: None },
|
||||
)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
// Run stage
|
||||
run(&db, 10, None);
|
||||
unwind(&db, 10, 5);
|
||||
|
||||
// Record address_b state before unwind
|
||||
// Should only have blocks 0-5 remaining
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let key_b = ShardedKey { key: address_b, highest_block_number: u64::MAX };
|
||||
let before_b = rocksdb.get::<tables::AccountsHistory>(key_b.clone()).unwrap();
|
||||
let before_b_blocks: Vec<u64> = before_b.unwrap().iter().collect();
|
||||
|
||||
// Unwind from 10 to 7 (only address_a has changes in 8-10)
|
||||
unwind(&db, 10, 7);
|
||||
|
||||
// Verify address_b is unchanged (no changes in unwind range)
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let after_b = rocksdb.get::<tables::AccountsHistory>(key_b).unwrap();
|
||||
let after_b_blocks: Vec<u64> = after_b.unwrap().iter().collect();
|
||||
assert_eq!(
|
||||
before_b_blocks, after_b_blocks,
|
||||
"Address B should be unchanged when it has no changes in unwind range"
|
||||
);
|
||||
|
||||
// Verify address_a was properly unwound
|
||||
let key_a = ShardedKey { key: address_a, highest_block_number: u64::MAX };
|
||||
let after_a = rocksdb.get::<tables::AccountsHistory>(key_a).unwrap();
|
||||
let after_a_blocks: Vec<u64> = after_a.unwrap().iter().collect();
|
||||
assert_eq!(
|
||||
after_a_blocks,
|
||||
(0..=7).collect::<Vec<_>>(),
|
||||
"Address A should have blocks 0-7 after unwind"
|
||||
);
|
||||
}
|
||||
|
||||
/// Test unwind deletes entry when all blocks are removed.
|
||||
#[tokio::test]
|
||||
async fn unwind_deletes_entry_when_all_blocks_removed() {
|
||||
let db = TestStageDB::default();
|
||||
|
||||
// Enable RocksDB for account history
|
||||
db.factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_account_history_in_rocksdb(true),
|
||||
);
|
||||
|
||||
// Setup: only add changesets for blocks 5-10
|
||||
db.commit(|tx| {
|
||||
for block in 0..=10u64 {
|
||||
tx.put::<tables::BlockBodyIndices>(
|
||||
block,
|
||||
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
|
||||
)?;
|
||||
if block >= 5 {
|
||||
tx.put::<tables::AccountChangeSets>(block, acc())?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
// Run stage
|
||||
run(&db, 10, None);
|
||||
|
||||
// Verify RocksDB has data
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let before = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
|
||||
assert!(before.is_some(), "RocksDB should have data before unwind");
|
||||
|
||||
// Unwind to block 4 (removes blocks 5-10, which is ALL the data)
|
||||
unwind(&db, 10, 4);
|
||||
|
||||
// Verify entry is deleted (no blocks left)
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let after = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
|
||||
assert!(after.is_none(), "Entry should be deleted when all blocks are removed");
|
||||
let blocks: Vec<u64> = rocksdb
|
||||
.get::<tables::AccountsHistory>(shard(u64::MAX))
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.collect();
|
||||
assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -742,117 +742,25 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
/// RocksDB-specific tests for storage history indexing.
|
||||
///
|
||||
/// These tests verify that when `storages_history_in_rocksdb` is enabled:
|
||||
/// - Execute writes indices to RocksDB instead of MDBX
|
||||
/// - Incremental syncs properly merge with existing RocksDB data
|
||||
/// - Unwind correctly removes indices from RocksDB
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
mod rocksdb_tests {
|
||||
use super::*;
|
||||
use reth_provider::RocksDBProviderFactory;
|
||||
use reth_storage_api::StorageSettings;
|
||||
|
||||
#[tokio::test]
|
||||
async fn execute_writes_storage_history_to_rocksdb_when_enabled() {
|
||||
let db = TestStageDB::default();
|
||||
|
||||
// Enable RocksDB for storage history
|
||||
/// Helper to setup RocksDB-enabled test environment with storage changesets.
|
||||
fn setup_with_changesets(db: &TestStageDB, block_range: std::ops::RangeInclusive<u64>) {
|
||||
db.factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
|
||||
);
|
||||
|
||||
// setup
|
||||
partial_setup(&db);
|
||||
|
||||
// run
|
||||
run(&db, MAX_BLOCK, None);
|
||||
|
||||
// Verify MDBX table is empty (data should be in RocksDB)
|
||||
let mdbx_count = db.table::<tables::StoragesHistory>().unwrap().len();
|
||||
assert_eq!(
|
||||
mdbx_count, 0,
|
||||
"MDBX StoragesHistory should be empty when RocksDB is enabled"
|
||||
);
|
||||
|
||||
// Verify RocksDB has the data
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let result = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
|
||||
assert!(result.is_some(), "Storage history should exist in RocksDB");
|
||||
|
||||
// Verify the block numbers are correct
|
||||
let list = result.unwrap();
|
||||
let blocks: Vec<u64> = list.iter().collect();
|
||||
assert!(!blocks.is_empty(), "Block list should not be empty");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn first_sync_clears_stale_rocksdb_storage_history() {
|
||||
let db = TestStageDB::default();
|
||||
|
||||
// Enable RocksDB for storage history
|
||||
db.factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
|
||||
);
|
||||
|
||||
// Seed RocksDB with a stale entry for a different address/storage key
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let stale_address = address!("0x0000000000000000000000000000000000000002");
|
||||
let stale_storage_key =
|
||||
b256!("0x0000000000000000000000000000000000000000000000000000000000000002");
|
||||
let stale_key = StorageShardedKey {
|
||||
address: stale_address,
|
||||
sharded_key: ShardedKey { key: stale_storage_key, highest_block_number: u64::MAX },
|
||||
};
|
||||
rocksdb.put::<tables::StoragesHistory>(stale_key.clone(), &list(&[999])).unwrap();
|
||||
|
||||
// setup
|
||||
partial_setup(&db);
|
||||
|
||||
// run (first sync)
|
||||
run(&db, MAX_BLOCK, None);
|
||||
|
||||
// Verify stale entry is removed
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let stale = rocksdb.get::<tables::StoragesHistory>(stale_key).unwrap();
|
||||
assert!(
|
||||
stale.is_none(),
|
||||
"Stale RocksDB storage history should be cleared on first sync"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn execute_writes_to_mdbx_when_rocksdb_disabled() {
|
||||
let db = TestStageDB::default();
|
||||
|
||||
// Ensure RocksDB is disabled for storage history (default)
|
||||
db.factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_storages_history_in_rocksdb(false),
|
||||
);
|
||||
|
||||
// setup
|
||||
partial_setup(&db);
|
||||
|
||||
// run
|
||||
run(&db, MAX_BLOCK, None);
|
||||
|
||||
// Verify MDBX table has data
|
||||
let mdbx_count = db.table::<tables::StoragesHistory>().unwrap().len();
|
||||
assert!(
|
||||
mdbx_count > 0,
|
||||
"MDBX StoragesHistory should have data when RocksDB is disabled"
|
||||
);
|
||||
}
|
||||
|
||||
/// Test incremental sync with RocksDB: run stage twice and verify indices are merged.
|
||||
#[tokio::test]
|
||||
async fn incremental_sync_merges_indices_in_rocksdb() {
|
||||
let db = TestStageDB::default();
|
||||
|
||||
// Enable RocksDB for storage history
|
||||
db.factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
|
||||
);
|
||||
|
||||
// First run: setup changesets for blocks 0-5 and index them
|
||||
let first_run_end: u64 = 5;
|
||||
db.commit(|tx| {
|
||||
for block in 0..=first_run_end {
|
||||
for block in block_range {
|
||||
tx.put::<tables::BlockBodyIndices>(
|
||||
block,
|
||||
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
|
||||
@@ -865,332 +773,78 @@ mod tests {
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
// Run stage for first batch (first_sync = true since checkpoint is 0)
|
||||
run(&db, first_run_end, None);
|
||||
|
||||
// Verify first run wrote to RocksDB
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let first_result = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
|
||||
assert!(first_result.is_some(), "First run should write to RocksDB");
|
||||
let first_blocks: Vec<u64> = first_result.unwrap().iter().collect();
|
||||
assert_eq!(
|
||||
first_blocks.len(),
|
||||
(first_run_end + 1) as usize,
|
||||
"First run should have blocks 0-5"
|
||||
);
|
||||
|
||||
// Second run: add changesets for blocks 6-10
|
||||
let second_run_end: u64 = 10;
|
||||
db.commit(|tx| {
|
||||
for block in (first_run_end + 1)..=second_run_end {
|
||||
tx.put::<tables::BlockBodyIndices>(
|
||||
block,
|
||||
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
|
||||
)?;
|
||||
tx.put::<tables::StorageChangeSets>(
|
||||
block_number_address(block),
|
||||
storage(STORAGE_KEY),
|
||||
)?;
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
// Run stage for second batch (first_sync = false since checkpoint > 0)
|
||||
run(&db, second_run_end, Some(first_run_end));
|
||||
|
||||
// Verify second run merged indices in RocksDB
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let merged_result = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
|
||||
assert!(merged_result.is_some(), "Second run should have data in RocksDB");
|
||||
let merged_blocks: Vec<u64> = merged_result.unwrap().iter().collect();
|
||||
|
||||
// Should contain all blocks from 0 to 10
|
||||
let expected_blocks: Vec<u64> = (0..=second_run_end).collect();
|
||||
assert_eq!(
|
||||
merged_blocks, expected_blocks,
|
||||
"Incremental sync should merge all indices: expected {:?}, got {:?}",
|
||||
expected_blocks, merged_blocks
|
||||
);
|
||||
|
||||
// Verify MDBX table is still empty
|
||||
let mdbx_count = db.table::<tables::StoragesHistory>().unwrap().len();
|
||||
assert_eq!(mdbx_count, 0, "MDBX should remain empty when RocksDB is enabled");
|
||||
}
|
||||
|
||||
/// Test unwind removes storage history from RocksDB when enabled.
|
||||
/// Verifies execute writes to RocksDB (not MDBX) when enabled.
|
||||
#[tokio::test]
|
||||
async fn unwind_removes_storage_history_from_rocksdb() {
|
||||
async fn execute_writes_to_rocksdb() {
|
||||
let db = TestStageDB::default();
|
||||
setup_with_changesets(&db, 0..=5);
|
||||
|
||||
// Enable RocksDB for storage history
|
||||
db.factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
|
||||
);
|
||||
|
||||
// setup changesets for blocks 0-10
|
||||
db.commit(|tx| {
|
||||
for block in 0..=10u64 {
|
||||
tx.put::<tables::BlockBodyIndices>(
|
||||
block,
|
||||
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
|
||||
)?;
|
||||
tx.put::<tables::StorageChangeSets>(
|
||||
block_number_address(block),
|
||||
storage(STORAGE_KEY),
|
||||
)?;
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
// Run stage to index blocks 0-10
|
||||
run(&db, 10, None);
|
||||
|
||||
// Verify RocksDB has all blocks
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let before_unwind = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
|
||||
assert!(before_unwind.is_some(), "RocksDB should have data before unwind");
|
||||
let before_blocks: Vec<u64> = before_unwind.unwrap().iter().collect();
|
||||
assert_eq!(before_blocks, (0..=10).collect::<Vec<_>>(), "Should have blocks 0-10");
|
||||
|
||||
// Unwind to block 5
|
||||
unwind(&db, 10, 5);
|
||||
|
||||
// Verify RocksDB only contains blocks 0-5 (blocks <= unwind_to are kept)
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let after_unwind = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
|
||||
assert!(after_unwind.is_some(), "RocksDB should still have data after partial unwind");
|
||||
let after_blocks: Vec<u64> = after_unwind.unwrap().iter().collect();
|
||||
assert_eq!(
|
||||
after_blocks,
|
||||
(0..=5).collect::<Vec<_>>(),
|
||||
"After unwind to 5, should have blocks 0-5"
|
||||
);
|
||||
|
||||
// Verify MDBX table is still empty
|
||||
let mdbx_count = db.table::<tables::StoragesHistory>().unwrap().len();
|
||||
assert_eq!(mdbx_count, 0, "MDBX should remain empty during RocksDB unwind");
|
||||
}
|
||||
|
||||
/// Test unwind to genesis removes all storage history from RocksDB.
|
||||
#[tokio::test]
|
||||
async fn unwind_to_genesis_clears_rocksdb_storage_history() {
|
||||
let db = TestStageDB::default();
|
||||
|
||||
// Enable RocksDB for storage history
|
||||
db.factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
|
||||
);
|
||||
|
||||
// setup changesets for blocks 0-5
|
||||
db.commit(|tx| {
|
||||
for block in 0..=5u64 {
|
||||
tx.put::<tables::BlockBodyIndices>(
|
||||
block,
|
||||
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
|
||||
)?;
|
||||
tx.put::<tables::StorageChangeSets>(
|
||||
block_number_address(block),
|
||||
storage(STORAGE_KEY),
|
||||
)?;
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
// Run stage
|
||||
run(&db, 5, None);
|
||||
|
||||
// Verify RocksDB has data
|
||||
// MDBX should be empty, RocksDB should have data
|
||||
assert_eq!(db.table::<tables::StoragesHistory>().unwrap().len(), 0);
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let before = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
|
||||
assert!(before.is_some(), "RocksDB should have data before unwind");
|
||||
|
||||
// Unwind to genesis (block 0)
|
||||
unwind(&db, 5, 0);
|
||||
|
||||
// Verify RocksDB still has block 0 (blocks <= unwind_to are kept)
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let after = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
|
||||
assert!(after.is_some(), "RocksDB should still have block 0 after unwind to genesis");
|
||||
let after_blocks: Vec<u64> = after.unwrap().iter().collect();
|
||||
assert_eq!(after_blocks, vec![0], "After unwind to 0, should only have block 0");
|
||||
assert!(rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap().is_some());
|
||||
}
|
||||
|
||||
/// Test unwind with no changesets in range is a no-op.
|
||||
/// Verifies incremental sync merges new indices with existing RocksDB data.
|
||||
/// This is critical: without proper merging, we'd lose previously indexed blocks.
|
||||
#[tokio::test]
|
||||
async fn unwind_with_no_changesets_is_noop() {
|
||||
async fn incremental_sync_merges_indices() {
|
||||
let db = TestStageDB::default();
|
||||
setup_with_changesets(&db, 0..=5);
|
||||
|
||||
// Enable RocksDB for storage history
|
||||
db.factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
|
||||
);
|
||||
// First sync: blocks 0-5
|
||||
run(&db, 5, None);
|
||||
|
||||
// Setup changesets only for blocks 0-5 (leave gap at 6-10)
|
||||
// Add more changesets and run incremental sync
|
||||
db.commit(|tx| {
|
||||
for block in 0..=5u64 {
|
||||
tx.put::<tables::BlockBodyIndices>(
|
||||
block,
|
||||
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
|
||||
)?;
|
||||
tx.put::<tables::StorageChangeSets>(
|
||||
block_number_address(block),
|
||||
storage(STORAGE_KEY),
|
||||
)?;
|
||||
}
|
||||
// Add block body indices for 6-10 but NO changesets
|
||||
for block in 6..=10u64 {
|
||||
tx.put::<tables::BlockBodyIndices>(
|
||||
block,
|
||||
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
|
||||
)?;
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
// Run stage to index blocks 0-10
|
||||
run(&db, 10, None);
|
||||
|
||||
// Record state before unwind
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let before = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
|
||||
let before_blocks: Vec<u64> = before.unwrap().iter().collect();
|
||||
|
||||
// Unwind from 10 to 7 (range 8-10 has no changesets)
|
||||
unwind(&db, 10, 7);
|
||||
|
||||
// Verify RocksDB data is unchanged (no changesets in unwind range)
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let after = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
|
||||
let after_blocks: Vec<u64> = after.unwrap().iter().collect();
|
||||
assert_eq!(
|
||||
before_blocks, after_blocks,
|
||||
"Data should be unchanged when no changesets in unwind range"
|
||||
);
|
||||
}
|
||||
|
||||
/// Test unwind preserves unrelated storage keys.
|
||||
#[tokio::test]
|
||||
async fn unwind_preserves_unrelated_storage_keys() {
|
||||
let db = TestStageDB::default();
|
||||
|
||||
// Enable RocksDB for storage history
|
||||
db.factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
|
||||
);
|
||||
|
||||
let key_1 = B256::with_last_byte(1);
|
||||
let key_2 = B256::with_last_byte(2);
|
||||
|
||||
// Setup: key_1 has changes in blocks 0-10, key_2 only in blocks 0-5
|
||||
db.commit(|tx| {
|
||||
for block in 0..=10u64 {
|
||||
tx.put::<tables::BlockBodyIndices>(
|
||||
block,
|
||||
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
|
||||
)?;
|
||||
// key_1 changes in all blocks
|
||||
tx.put::<tables::StorageChangeSets>(
|
||||
block_number_address(block),
|
||||
storage(key_1),
|
||||
storage(STORAGE_KEY),
|
||||
)?;
|
||||
// key_2 only changes in blocks 0-5
|
||||
if block <= 5 {
|
||||
tx.put::<tables::StorageChangeSets>(
|
||||
block_number_address(block),
|
||||
storage(key_2),
|
||||
)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
run(&db, 10, Some(5));
|
||||
|
||||
// Run stage
|
||||
run(&db, 10, None);
|
||||
|
||||
// Record key_2 state before unwind
|
||||
// Should have all blocks 0-10 merged
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let shard_key_2 = StorageShardedKey {
|
||||
address: ADDRESS,
|
||||
sharded_key: ShardedKey { key: key_2, highest_block_number: u64::MAX },
|
||||
};
|
||||
let before_2 = rocksdb.get::<tables::StoragesHistory>(shard_key_2.clone()).unwrap();
|
||||
let before_2_blocks: Vec<u64> = before_2.unwrap().iter().collect();
|
||||
|
||||
// Unwind from 10 to 7 (only key_1 has changes in 8-10)
|
||||
unwind(&db, 10, 7);
|
||||
|
||||
// Verify key_2 is unchanged (no changes in unwind range)
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let after_2 = rocksdb.get::<tables::StoragesHistory>(shard_key_2).unwrap();
|
||||
let after_2_blocks: Vec<u64> = after_2.unwrap().iter().collect();
|
||||
assert_eq!(
|
||||
before_2_blocks, after_2_blocks,
|
||||
"Key 2 should be unchanged when it has no changes in unwind range"
|
||||
);
|
||||
|
||||
// Verify key_1 was properly unwound
|
||||
let shard_key_1 = StorageShardedKey {
|
||||
address: ADDRESS,
|
||||
sharded_key: ShardedKey { key: key_1, highest_block_number: u64::MAX },
|
||||
};
|
||||
let after_1 = rocksdb.get::<tables::StoragesHistory>(shard_key_1).unwrap();
|
||||
let after_1_blocks: Vec<u64> = after_1.unwrap().iter().collect();
|
||||
assert_eq!(
|
||||
after_1_blocks,
|
||||
(0..=7).collect::<Vec<_>>(),
|
||||
"Key 1 should have blocks 0-7 after unwind"
|
||||
);
|
||||
let blocks: Vec<u64> = rocksdb
|
||||
.get::<tables::StoragesHistory>(shard(u64::MAX))
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.collect();
|
||||
assert_eq!(blocks, (0..=10).collect::<Vec<_>>());
|
||||
}
|
||||
|
||||
/// Test unwind deletes entry when all blocks are removed.
|
||||
/// Verifies unwind removes indices >= unwind_to from RocksDB.
|
||||
#[tokio::test]
|
||||
async fn unwind_deletes_entry_when_all_blocks_removed() {
|
||||
async fn unwind_removes_indices() {
|
||||
let db = TestStageDB::default();
|
||||
setup_with_changesets(&db, 0..=10);
|
||||
|
||||
// Enable RocksDB for storage history
|
||||
db.factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
|
||||
);
|
||||
|
||||
// Setup: only add changesets for blocks 5-10
|
||||
db.commit(|tx| {
|
||||
for block in 0..=10u64 {
|
||||
tx.put::<tables::BlockBodyIndices>(
|
||||
block,
|
||||
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
|
||||
)?;
|
||||
if block >= 5 {
|
||||
tx.put::<tables::StorageChangeSets>(
|
||||
block_number_address(block),
|
||||
storage(STORAGE_KEY),
|
||||
)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
// Run stage
|
||||
run(&db, 10, None);
|
||||
unwind(&db, 10, 5);
|
||||
|
||||
// Verify RocksDB has data
|
||||
// Should only have blocks 0-5 remaining
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let before = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
|
||||
assert!(before.is_some(), "RocksDB should have data before unwind");
|
||||
|
||||
// Unwind to block 4 (removes blocks 5-10, which is ALL the data)
|
||||
unwind(&db, 10, 4);
|
||||
|
||||
// Verify entry is deleted (no blocks left)
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let after = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
|
||||
assert!(after.is_none(), "Entry should be deleted when all blocks are removed");
|
||||
let blocks: Vec<u64> = rocksdb
|
||||
.get::<tables::StoragesHistory>(shard(u64::MAX))
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.collect();
|
||||
assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user