From f6373102169bcb8d115f48d79f46a184b69e8a86 Mon Sep 17 00:00:00 2001 From: yongkangc Date: Mon, 5 Jan 2026 10:12:02 +0000 Subject: [PATCH] test: add unwind tests for shard boundary handling in account and storage history - Introduced a new test `unwind_crosses_shard_boundary` in both `index_account_history.rs` and `index_storage_history.rs`. - The test verifies that unwinding correctly manages indices that span multiple shards, ensuring that the sentinel shard only contains blocks below the unwind threshold and that the old full shard is deleted. - Enhancements to the existing test suite improve coverage for edge cases in shard management. These changes enhance the robustness of the unwind functionality in the RocksDB implementation. --- .../src/stages/index_account_history.rs | 35 ++++++ .../src/stages/index_storage_history.rs | 35 ++++++ crates/stages/stages/src/stages/utils.rs | 105 ++++++++++++------ 3 files changed, 143 insertions(+), 32 deletions(-) diff --git a/crates/stages/stages/src/stages/index_account_history.rs b/crates/stages/stages/src/stages/index_account_history.rs index 0a43d14307..35c45b4b65 100644 --- a/crates/stages/stages/src/stages/index_account_history.rs +++ b/crates/stages/stages/src/stages/index_account_history.rs @@ -812,5 +812,40 @@ mod tests { .collect(); assert_eq!(blocks, (0..=5).collect::>()); } + + /// Verifies unwind correctly handles indices spanning multiple shards. + /// + /// Creates enough blocks to fill one complete shard plus overflow, then unwinds + /// to a point inside the first shard. Verifies: + /// - Sentinel shard contains only blocks < unwind_to + /// - The old full shard (keyed by its max block) is deleted + #[tokio::test] + async fn unwind_crosses_shard_boundary() { + let db = TestStageDB::default(); + let max_block = NUM_OF_INDICES_IN_SHARD as u64 + 10; + let unwind_to = NUM_OF_INDICES_IN_SHARD as u64 - 10; + + setup_with_changesets(&db, 0..=max_block); + run(&db, max_block, None); + unwind(&db, max_block, unwind_to); + + let rocksdb = db.factory.rocksdb_provider(); + + // Sentinel shard should contain only blocks 0..=unwind_to + let blocks: Vec = rocksdb + .get::(shard(u64::MAX)) + .unwrap() + .unwrap() + .iter() + .collect(); + assert_eq!(blocks, (0..=unwind_to).collect::>()); + + // The old full shard should be deleted + let full_shard_max = NUM_OF_INDICES_IN_SHARD as u64 - 1; + assert!(rocksdb + .get::(shard(full_shard_max)) + .unwrap() + .is_none()); + } } } diff --git a/crates/stages/stages/src/stages/index_storage_history.rs b/crates/stages/stages/src/stages/index_storage_history.rs index 49b8c4fc33..3d64f68a1d 100644 --- a/crates/stages/stages/src/stages/index_storage_history.rs +++ b/crates/stages/stages/src/stages/index_storage_history.rs @@ -846,5 +846,40 @@ mod tests { .collect(); assert_eq!(blocks, (0..=5).collect::>()); } + + /// Verifies unwind correctly handles indices spanning multiple shards. + /// + /// Creates enough blocks to fill one complete shard plus overflow, then unwinds + /// to a point inside the first shard. Verifies: + /// - Sentinel shard contains only blocks < unwind_to + /// - The old full shard (keyed by its max block) is deleted + #[tokio::test] + async fn unwind_crosses_shard_boundary() { + let db = TestStageDB::default(); + let max_block = NUM_OF_INDICES_IN_SHARD as u64 + 10; + let unwind_to = NUM_OF_INDICES_IN_SHARD as u64 - 10; + + setup_with_changesets(&db, 0..=max_block); + run(&db, max_block, None); + unwind(&db, max_block, unwind_to); + + let rocksdb = db.factory.rocksdb_provider(); + + // Sentinel shard should contain only blocks 0..=unwind_to + let blocks: Vec = rocksdb + .get::(shard(u64::MAX)) + .unwrap() + .unwrap() + .iter() + .collect(); + assert_eq!(blocks, (0..=unwind_to).collect::>()); + + // The old full shard should be deleted + let full_shard_max = NUM_OF_INDICES_IN_SHARD as u64 - 1; + assert!(rocksdb + .get::(shard(full_shard_max)) + .unwrap() + .is_none()); + } } } diff --git a/crates/stages/stages/src/stages/utils.rs b/crates/stages/stages/src/stages/utils.rs index b2bef3dc7b..bfc1decbe5 100644 --- a/crates/stages/stages/src/stages/utils.rs +++ b/crates/stages/stages/src/stages/utils.rs @@ -452,7 +452,7 @@ fn flush_account_shards( partial_key: alloy_primitives::Address, list: &mut Vec, append_only: bool, - flush: bool, + flush_all: bool, ) -> Result<(), StageError> where CURSOR: DbCursorRW @@ -461,15 +461,15 @@ where { use reth_db_api::models::ShardedKey; - if list.len() > NUM_OF_INDICES_IN_SHARD || flush { + if list.len() > NUM_OF_INDICES_IN_SHARD || flush_all { let chunks = list.chunks(NUM_OF_INDICES_IN_SHARD).map(|c| c.to_vec()).collect::>>(); let mut iter = chunks.into_iter().peekable(); while let Some(chunk) = iter.next() { - let mut highest = *chunk.last().expect("at least one index"); + let mut highest = *chunk.last().expect("BlockNumberList shard chunk must be non-empty"); - if !flush && iter.peek().is_none() { + if !flush_all && iter.peek().is_none() { *list = chunk; } else { if iter.peek().is_none() { @@ -487,44 +487,78 @@ where /// Generic helper to unwind history indices using `RocksDB`. /// -/// For each affected partial key: -/// 1. Reads the existing shard from `RocksDB` (the one with `u64::MAX` sentinel) -/// 2. Filters out block numbers >= `first_block_to_remove` -/// 3. If the result is empty, deletes the entry -/// 4. Otherwise, writes back the truncated shard -/// -/// Note: This only handles the sentinel shard (`u64::MAX`). This is correct for typical -/// unwind operations where recently-added blocks are always in the sentinel shard. +/// For each affected partial key, iterates shards in reverse order and: +/// 1. Deletes shards where all indices are `>= first_block_to_remove` +/// 2. For the boundary shard (contains indices both above and below the threshold), filters to keep +/// only indices `< first_block_to_remove` and rewrites as sentinel +/// 3. Preserves earlier shards unchanged +/// 4. If no indices remain after filtering, the key is fully removed #[cfg(all(unix, feature = "rocksdb"))] -fn unwind_history_via_rocksdb_generic( +fn unwind_history_via_rocksdb_generic( provider: &Provider, affected_keys: impl IntoIterator, first_block_to_remove: BlockNumber, - mk_sentinel_key: impl Fn(PK) -> K, + mk_start_key: impl Fn(&PK) -> K, + mk_sentinel_key: impl Fn(&PK) -> K, + shard_belongs_to_key: impl Fn(&K, &PK) -> bool, ) -> Result where Provider: DBProvider + reth_provider::RocksDBProviderFactory, T: reth_db_api::table::Table, - K: Clone, + K: Clone + AsRef>, { let rocksdb = provider.rocksdb_provider(); + let tx = rocksdb.tx(); let mut batch = rocksdb.batch(); let mut count = 0; for pk in affected_keys { - let key = mk_sentinel_key(pk); - if let Some(existing_list) = rocksdb.get::(key.clone())? { - // Keep blocks < first_block_to_remove (matches MDBX semantics) - let filtered: Vec = - existing_list.iter().filter(|&block| block < first_block_to_remove).collect(); + let start_key = mk_start_key(&pk); + let mut shards = Vec::<(K, BlockNumberList)>::new(); - if filtered.is_empty() { - batch.delete::(key)?; - } else { - let new_list = BlockNumberList::new_pre_sorted(filtered); - batch.put::(key, &new_list)?; + for entry in tx.iter_from::(start_key)? { + let (key, list) = entry?; + if !shard_belongs_to_key(&key, &pk) { + break; } - count += 1; + shards.push((key, list)); + } + + if shards.is_empty() { + continue; + } + + count += 1; + let mut reinsert: Option> = None; + + for (key, list) in shards.iter().rev() { + let first = list.iter().next().expect("BlockNumberList shard must be non-empty"); + let highest = key.as_ref().highest_block_number; + + if first >= first_block_to_remove { + // Entire shard is above threshold - delete it + batch.delete::(key.clone())?; + continue; + } + + // This shard has indices below the threshold + if first_block_to_remove <= highest { + // Boundary shard: filter and rewrite as sentinel + batch.delete::(key.clone())?; + let filtered: Vec = + list.iter().take_while(|block| *block < first_block_to_remove).collect(); + if !filtered.is_empty() { + reinsert = Some(filtered); + } + } + // Earlier shards are entirely below threshold - keep them unchanged + break; + } + + if let Some(indices) = reinsert { + let sentinel_key = mk_sentinel_key(&pk); + let new_list = BlockNumberList::new_pre_sorted(indices); + batch.put::(sentinel_key, &new_list)?; } } @@ -535,7 +569,7 @@ where /// Unwind storage history indices using `RocksDB`. /// /// Takes a set of affected (address, `storage_key`) pairs and removes all block numbers -/// >= `first_block_to_remove` from the history indices. +/// >= `first_block_to_remove` from the history indices across ALL shards. #[cfg(all(unix, feature = "rocksdb"))] pub(crate) fn unwind_storage_history_via_rocksdb( provider: &Provider, @@ -547,18 +581,23 @@ where { use reth_db_api::models::storage_sharded_key::StorageShardedKey; - unwind_history_via_rocksdb_generic::<_, reth_db_api::tables::StoragesHistory, _, _>( + unwind_history_via_rocksdb_generic::<_, reth_db_api::tables::StoragesHistory, _, _, _>( provider, affected_keys, first_block_to_remove, - |(address, storage_key)| StorageShardedKey::new(address, storage_key, u64::MAX), + |(address, storage_key)| StorageShardedKey::new(*address, *storage_key, 0), + |(address, storage_key)| StorageShardedKey::new(*address, *storage_key, u64::MAX), + |storage_sharded_key, (address, storage_key)| { + storage_sharded_key.address == *address && + storage_sharded_key.sharded_key.key == *storage_key + }, ) } /// Unwind account history indices using `RocksDB`. /// /// Takes a set of affected addresses and removes all block numbers -/// >= `first_block_to_remove` from the history indices. +/// >= `first_block_to_remove` from the history indices across ALL shards. #[cfg(all(unix, feature = "rocksdb"))] pub(crate) fn unwind_account_history_via_rocksdb( provider: &Provider, @@ -570,11 +609,13 @@ where { use reth_db_api::models::ShardedKey; - unwind_history_via_rocksdb_generic::<_, reth_db_api::tables::AccountsHistory, _, _>( + unwind_history_via_rocksdb_generic::<_, reth_db_api::tables::AccountsHistory, _, _, _>( provider, affected_addresses, first_block_to_remove, - |address| ShardedKey::new(address, u64::MAX), + |address| ShardedKey::new(*address, 0), + |address| ShardedKey::new(*address, u64::MAX), + |sharded_key, address| sharded_key.key == *address, ) }