mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-10 15:58:27 -05:00
test: add unwind tests for shard boundary handling in account and storage history
- Introduced a new test `unwind_crosses_shard_boundary` in both `index_account_history.rs` and `index_storage_history.rs`. - The test verifies that unwinding correctly manages indices that span multiple shards, ensuring that the sentinel shard only contains blocks below the unwind threshold and that the old full shard is deleted. - Enhancements to the existing test suite improve coverage for edge cases in shard management. These changes enhance the robustness of the unwind functionality in the RocksDB implementation.
This commit is contained in:
@@ -812,5 +812,40 @@ mod tests {
|
||||
.collect();
|
||||
assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
|
||||
}
|
||||
|
||||
/// Verifies unwind correctly handles indices spanning multiple shards.
|
||||
///
|
||||
/// Creates enough blocks to fill one complete shard plus overflow, then unwinds
|
||||
/// to a point inside the first shard. Verifies:
|
||||
/// - Sentinel shard contains only blocks < unwind_to
|
||||
/// - The old full shard (keyed by its max block) is deleted
|
||||
#[tokio::test]
|
||||
async fn unwind_crosses_shard_boundary() {
|
||||
let db = TestStageDB::default();
|
||||
let max_block = NUM_OF_INDICES_IN_SHARD as u64 + 10;
|
||||
let unwind_to = NUM_OF_INDICES_IN_SHARD as u64 - 10;
|
||||
|
||||
setup_with_changesets(&db, 0..=max_block);
|
||||
run(&db, max_block, None);
|
||||
unwind(&db, max_block, unwind_to);
|
||||
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
|
||||
// Sentinel shard should contain only blocks 0..=unwind_to
|
||||
let blocks: Vec<u64> = rocksdb
|
||||
.get::<tables::AccountsHistory>(shard(u64::MAX))
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.collect();
|
||||
assert_eq!(blocks, (0..=unwind_to).collect::<Vec<_>>());
|
||||
|
||||
// The old full shard should be deleted
|
||||
let full_shard_max = NUM_OF_INDICES_IN_SHARD as u64 - 1;
|
||||
assert!(rocksdb
|
||||
.get::<tables::AccountsHistory>(shard(full_shard_max))
|
||||
.unwrap()
|
||||
.is_none());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -846,5 +846,40 @@ mod tests {
|
||||
.collect();
|
||||
assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
|
||||
}
|
||||
|
||||
/// Verifies unwind correctly handles indices spanning multiple shards.
|
||||
///
|
||||
/// Creates enough blocks to fill one complete shard plus overflow, then unwinds
|
||||
/// to a point inside the first shard. Verifies:
|
||||
/// - Sentinel shard contains only blocks < unwind_to
|
||||
/// - The old full shard (keyed by its max block) is deleted
|
||||
#[tokio::test]
|
||||
async fn unwind_crosses_shard_boundary() {
|
||||
let db = TestStageDB::default();
|
||||
let max_block = NUM_OF_INDICES_IN_SHARD as u64 + 10;
|
||||
let unwind_to = NUM_OF_INDICES_IN_SHARD as u64 - 10;
|
||||
|
||||
setup_with_changesets(&db, 0..=max_block);
|
||||
run(&db, max_block, None);
|
||||
unwind(&db, max_block, unwind_to);
|
||||
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
|
||||
// Sentinel shard should contain only blocks 0..=unwind_to
|
||||
let blocks: Vec<u64> = rocksdb
|
||||
.get::<tables::StoragesHistory>(shard(u64::MAX))
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.collect();
|
||||
assert_eq!(blocks, (0..=unwind_to).collect::<Vec<_>>());
|
||||
|
||||
// The old full shard should be deleted
|
||||
let full_shard_max = NUM_OF_INDICES_IN_SHARD as u64 - 1;
|
||||
assert!(rocksdb
|
||||
.get::<tables::StoragesHistory>(shard(full_shard_max))
|
||||
.unwrap()
|
||||
.is_none());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -452,7 +452,7 @@ fn flush_account_shards<CURSOR, N>(
|
||||
partial_key: alloy_primitives::Address,
|
||||
list: &mut Vec<BlockNumber>,
|
||||
append_only: bool,
|
||||
flush: bool,
|
||||
flush_all: bool,
|
||||
) -> Result<(), StageError>
|
||||
where
|
||||
CURSOR: DbCursorRW<reth_db_api::tables::AccountsHistory>
|
||||
@@ -461,15 +461,15 @@ where
|
||||
{
|
||||
use reth_db_api::models::ShardedKey;
|
||||
|
||||
if list.len() > NUM_OF_INDICES_IN_SHARD || flush {
|
||||
if list.len() > NUM_OF_INDICES_IN_SHARD || flush_all {
|
||||
let chunks =
|
||||
list.chunks(NUM_OF_INDICES_IN_SHARD).map(|c| c.to_vec()).collect::<Vec<Vec<u64>>>();
|
||||
|
||||
let mut iter = chunks.into_iter().peekable();
|
||||
while let Some(chunk) = iter.next() {
|
||||
let mut highest = *chunk.last().expect("at least one index");
|
||||
let mut highest = *chunk.last().expect("BlockNumberList shard chunk must be non-empty");
|
||||
|
||||
if !flush && iter.peek().is_none() {
|
||||
if !flush_all && iter.peek().is_none() {
|
||||
*list = chunk;
|
||||
} else {
|
||||
if iter.peek().is_none() {
|
||||
@@ -487,44 +487,78 @@ where
|
||||
|
||||
/// Generic helper to unwind history indices using `RocksDB`.
|
||||
///
|
||||
/// For each affected partial key:
|
||||
/// 1. Reads the existing shard from `RocksDB` (the one with `u64::MAX` sentinel)
|
||||
/// 2. Filters out block numbers >= `first_block_to_remove`
|
||||
/// 3. If the result is empty, deletes the entry
|
||||
/// 4. Otherwise, writes back the truncated shard
|
||||
///
|
||||
/// Note: This only handles the sentinel shard (`u64::MAX`). This is correct for typical
|
||||
/// unwind operations where recently-added blocks are always in the sentinel shard.
|
||||
/// For each affected partial key, iterates shards in reverse order and:
|
||||
/// 1. Deletes shards where all indices are `>= first_block_to_remove`
|
||||
/// 2. For the boundary shard (contains indices both above and below the threshold), filters to keep
|
||||
/// only indices `< first_block_to_remove` and rewrites as sentinel
|
||||
/// 3. Preserves earlier shards unchanged
|
||||
/// 4. If no indices remain after filtering, the key is fully removed
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
fn unwind_history_via_rocksdb_generic<Provider, T, PK, K>(
|
||||
fn unwind_history_via_rocksdb_generic<Provider, T, PK, K, SK>(
|
||||
provider: &Provider,
|
||||
affected_keys: impl IntoIterator<Item = PK>,
|
||||
first_block_to_remove: BlockNumber,
|
||||
mk_sentinel_key: impl Fn(PK) -> K,
|
||||
mk_start_key: impl Fn(&PK) -> K,
|
||||
mk_sentinel_key: impl Fn(&PK) -> K,
|
||||
shard_belongs_to_key: impl Fn(&K, &PK) -> bool,
|
||||
) -> Result<usize, StageError>
|
||||
where
|
||||
Provider: DBProvider + reth_provider::RocksDBProviderFactory,
|
||||
T: reth_db_api::table::Table<Key = K, Value = BlockNumberList>,
|
||||
K: Clone,
|
||||
K: Clone + AsRef<reth_db_api::models::ShardedKey<SK>>,
|
||||
{
|
||||
let rocksdb = provider.rocksdb_provider();
|
||||
let tx = rocksdb.tx();
|
||||
let mut batch = rocksdb.batch();
|
||||
let mut count = 0;
|
||||
|
||||
for pk in affected_keys {
|
||||
let key = mk_sentinel_key(pk);
|
||||
if let Some(existing_list) = rocksdb.get::<T>(key.clone())? {
|
||||
// Keep blocks < first_block_to_remove (matches MDBX semantics)
|
||||
let filtered: Vec<u64> =
|
||||
existing_list.iter().filter(|&block| block < first_block_to_remove).collect();
|
||||
let start_key = mk_start_key(&pk);
|
||||
let mut shards = Vec::<(K, BlockNumberList)>::new();
|
||||
|
||||
if filtered.is_empty() {
|
||||
batch.delete::<T>(key)?;
|
||||
} else {
|
||||
let new_list = BlockNumberList::new_pre_sorted(filtered);
|
||||
batch.put::<T>(key, &new_list)?;
|
||||
for entry in tx.iter_from::<T>(start_key)? {
|
||||
let (key, list) = entry?;
|
||||
if !shard_belongs_to_key(&key, &pk) {
|
||||
break;
|
||||
}
|
||||
count += 1;
|
||||
shards.push((key, list));
|
||||
}
|
||||
|
||||
if shards.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
count += 1;
|
||||
let mut reinsert: Option<Vec<u64>> = None;
|
||||
|
||||
for (key, list) in shards.iter().rev() {
|
||||
let first = list.iter().next().expect("BlockNumberList shard must be non-empty");
|
||||
let highest = key.as_ref().highest_block_number;
|
||||
|
||||
if first >= first_block_to_remove {
|
||||
// Entire shard is above threshold - delete it
|
||||
batch.delete::<T>(key.clone())?;
|
||||
continue;
|
||||
}
|
||||
|
||||
// This shard has indices below the threshold
|
||||
if first_block_to_remove <= highest {
|
||||
// Boundary shard: filter and rewrite as sentinel
|
||||
batch.delete::<T>(key.clone())?;
|
||||
let filtered: Vec<u64> =
|
||||
list.iter().take_while(|block| *block < first_block_to_remove).collect();
|
||||
if !filtered.is_empty() {
|
||||
reinsert = Some(filtered);
|
||||
}
|
||||
}
|
||||
// Earlier shards are entirely below threshold - keep them unchanged
|
||||
break;
|
||||
}
|
||||
|
||||
if let Some(indices) = reinsert {
|
||||
let sentinel_key = mk_sentinel_key(&pk);
|
||||
let new_list = BlockNumberList::new_pre_sorted(indices);
|
||||
batch.put::<T>(sentinel_key, &new_list)?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -535,7 +569,7 @@ where
|
||||
/// Unwind storage history indices using `RocksDB`.
|
||||
///
|
||||
/// Takes a set of affected (address, `storage_key`) pairs and removes all block numbers
|
||||
/// >= `first_block_to_remove` from the history indices.
|
||||
/// >= `first_block_to_remove` from the history indices across ALL shards.
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
pub(crate) fn unwind_storage_history_via_rocksdb<Provider>(
|
||||
provider: &Provider,
|
||||
@@ -547,18 +581,23 @@ where
|
||||
{
|
||||
use reth_db_api::models::storage_sharded_key::StorageShardedKey;
|
||||
|
||||
unwind_history_via_rocksdb_generic::<_, reth_db_api::tables::StoragesHistory, _, _>(
|
||||
unwind_history_via_rocksdb_generic::<_, reth_db_api::tables::StoragesHistory, _, _, _>(
|
||||
provider,
|
||||
affected_keys,
|
||||
first_block_to_remove,
|
||||
|(address, storage_key)| StorageShardedKey::new(address, storage_key, u64::MAX),
|
||||
|(address, storage_key)| StorageShardedKey::new(*address, *storage_key, 0),
|
||||
|(address, storage_key)| StorageShardedKey::new(*address, *storage_key, u64::MAX),
|
||||
|storage_sharded_key, (address, storage_key)| {
|
||||
storage_sharded_key.address == *address &&
|
||||
storage_sharded_key.sharded_key.key == *storage_key
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
/// Unwind account history indices using `RocksDB`.
|
||||
///
|
||||
/// Takes a set of affected addresses and removes all block numbers
|
||||
/// >= `first_block_to_remove` from the history indices.
|
||||
/// >= `first_block_to_remove` from the history indices across ALL shards.
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
pub(crate) fn unwind_account_history_via_rocksdb<Provider>(
|
||||
provider: &Provider,
|
||||
@@ -570,11 +609,13 @@ where
|
||||
{
|
||||
use reth_db_api::models::ShardedKey;
|
||||
|
||||
unwind_history_via_rocksdb_generic::<_, reth_db_api::tables::AccountsHistory, _, _>(
|
||||
unwind_history_via_rocksdb_generic::<_, reth_db_api::tables::AccountsHistory, _, _, _>(
|
||||
provider,
|
||||
affected_addresses,
|
||||
first_block_to_remove,
|
||||
|address| ShardedKey::new(address, u64::MAX),
|
||||
|address| ShardedKey::new(*address, 0),
|
||||
|address| ShardedKey::new(*address, u64::MAX),
|
||||
|sharded_key, address| sharded_key.key == *address,
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user