diff --git a/crates/stages/stages/src/stages/index_account_history.rs b/crates/stages/stages/src/stages/index_account_history.rs index 0a43d14307..35c45b4b65 100644 --- a/crates/stages/stages/src/stages/index_account_history.rs +++ b/crates/stages/stages/src/stages/index_account_history.rs @@ -812,5 +812,40 @@ mod tests { .collect(); assert_eq!(blocks, (0..=5).collect::>()); } + + /// Verifies unwind correctly handles indices spanning multiple shards. + /// + /// Creates enough blocks to fill one complete shard plus overflow, then unwinds + /// to a point inside the first shard. Verifies: + /// - Sentinel shard contains only blocks < unwind_to + /// - The old full shard (keyed by its max block) is deleted + #[tokio::test] + async fn unwind_crosses_shard_boundary() { + let db = TestStageDB::default(); + let max_block = NUM_OF_INDICES_IN_SHARD as u64 + 10; + let unwind_to = NUM_OF_INDICES_IN_SHARD as u64 - 10; + + setup_with_changesets(&db, 0..=max_block); + run(&db, max_block, None); + unwind(&db, max_block, unwind_to); + + let rocksdb = db.factory.rocksdb_provider(); + + // Sentinel shard should contain only blocks 0..=unwind_to + let blocks: Vec = rocksdb + .get::(shard(u64::MAX)) + .unwrap() + .unwrap() + .iter() + .collect(); + assert_eq!(blocks, (0..=unwind_to).collect::>()); + + // The old full shard should be deleted + let full_shard_max = NUM_OF_INDICES_IN_SHARD as u64 - 1; + assert!(rocksdb + .get::(shard(full_shard_max)) + .unwrap() + .is_none()); + } } } diff --git a/crates/stages/stages/src/stages/index_storage_history.rs b/crates/stages/stages/src/stages/index_storage_history.rs index 49b8c4fc33..3d64f68a1d 100644 --- a/crates/stages/stages/src/stages/index_storage_history.rs +++ b/crates/stages/stages/src/stages/index_storage_history.rs @@ -846,5 +846,40 @@ mod tests { .collect(); assert_eq!(blocks, (0..=5).collect::>()); } + + /// Verifies unwind correctly handles indices spanning multiple shards. + /// + /// Creates enough blocks to fill one complete shard plus overflow, then unwinds + /// to a point inside the first shard. Verifies: + /// - Sentinel shard contains only blocks < unwind_to + /// - The old full shard (keyed by its max block) is deleted + #[tokio::test] + async fn unwind_crosses_shard_boundary() { + let db = TestStageDB::default(); + let max_block = NUM_OF_INDICES_IN_SHARD as u64 + 10; + let unwind_to = NUM_OF_INDICES_IN_SHARD as u64 - 10; + + setup_with_changesets(&db, 0..=max_block); + run(&db, max_block, None); + unwind(&db, max_block, unwind_to); + + let rocksdb = db.factory.rocksdb_provider(); + + // Sentinel shard should contain only blocks 0..=unwind_to + let blocks: Vec = rocksdb + .get::(shard(u64::MAX)) + .unwrap() + .unwrap() + .iter() + .collect(); + assert_eq!(blocks, (0..=unwind_to).collect::>()); + + // The old full shard should be deleted + let full_shard_max = NUM_OF_INDICES_IN_SHARD as u64 - 1; + assert!(rocksdb + .get::(shard(full_shard_max)) + .unwrap() + .is_none()); + } } } diff --git a/crates/stages/stages/src/stages/utils.rs b/crates/stages/stages/src/stages/utils.rs index b2bef3dc7b..bfc1decbe5 100644 --- a/crates/stages/stages/src/stages/utils.rs +++ b/crates/stages/stages/src/stages/utils.rs @@ -452,7 +452,7 @@ fn flush_account_shards( partial_key: alloy_primitives::Address, list: &mut Vec, append_only: bool, - flush: bool, + flush_all: bool, ) -> Result<(), StageError> where CURSOR: DbCursorRW @@ -461,15 +461,15 @@ where { use reth_db_api::models::ShardedKey; - if list.len() > NUM_OF_INDICES_IN_SHARD || flush { + if list.len() > NUM_OF_INDICES_IN_SHARD || flush_all { let chunks = list.chunks(NUM_OF_INDICES_IN_SHARD).map(|c| c.to_vec()).collect::>>(); let mut iter = chunks.into_iter().peekable(); while let Some(chunk) = iter.next() { - let mut highest = *chunk.last().expect("at least one index"); + let mut highest = *chunk.last().expect("BlockNumberList shard chunk must be non-empty"); - if !flush && iter.peek().is_none() { + if !flush_all && iter.peek().is_none() { *list = chunk; } else { if iter.peek().is_none() { @@ -487,44 +487,78 @@ where /// Generic helper to unwind history indices using `RocksDB`. /// -/// For each affected partial key: -/// 1. Reads the existing shard from `RocksDB` (the one with `u64::MAX` sentinel) -/// 2. Filters out block numbers >= `first_block_to_remove` -/// 3. If the result is empty, deletes the entry -/// 4. Otherwise, writes back the truncated shard -/// -/// Note: This only handles the sentinel shard (`u64::MAX`). This is correct for typical -/// unwind operations where recently-added blocks are always in the sentinel shard. +/// For each affected partial key, iterates shards in reverse order and: +/// 1. Deletes shards where all indices are `>= first_block_to_remove` +/// 2. For the boundary shard (contains indices both above and below the threshold), filters to keep +/// only indices `< first_block_to_remove` and rewrites as sentinel +/// 3. Preserves earlier shards unchanged +/// 4. If no indices remain after filtering, the key is fully removed #[cfg(all(unix, feature = "rocksdb"))] -fn unwind_history_via_rocksdb_generic( +fn unwind_history_via_rocksdb_generic( provider: &Provider, affected_keys: impl IntoIterator, first_block_to_remove: BlockNumber, - mk_sentinel_key: impl Fn(PK) -> K, + mk_start_key: impl Fn(&PK) -> K, + mk_sentinel_key: impl Fn(&PK) -> K, + shard_belongs_to_key: impl Fn(&K, &PK) -> bool, ) -> Result where Provider: DBProvider + reth_provider::RocksDBProviderFactory, T: reth_db_api::table::Table, - K: Clone, + K: Clone + AsRef>, { let rocksdb = provider.rocksdb_provider(); + let tx = rocksdb.tx(); let mut batch = rocksdb.batch(); let mut count = 0; for pk in affected_keys { - let key = mk_sentinel_key(pk); - if let Some(existing_list) = rocksdb.get::(key.clone())? { - // Keep blocks < first_block_to_remove (matches MDBX semantics) - let filtered: Vec = - existing_list.iter().filter(|&block| block < first_block_to_remove).collect(); + let start_key = mk_start_key(&pk); + let mut shards = Vec::<(K, BlockNumberList)>::new(); - if filtered.is_empty() { - batch.delete::(key)?; - } else { - let new_list = BlockNumberList::new_pre_sorted(filtered); - batch.put::(key, &new_list)?; + for entry in tx.iter_from::(start_key)? { + let (key, list) = entry?; + if !shard_belongs_to_key(&key, &pk) { + break; } - count += 1; + shards.push((key, list)); + } + + if shards.is_empty() { + continue; + } + + count += 1; + let mut reinsert: Option> = None; + + for (key, list) in shards.iter().rev() { + let first = list.iter().next().expect("BlockNumberList shard must be non-empty"); + let highest = key.as_ref().highest_block_number; + + if first >= first_block_to_remove { + // Entire shard is above threshold - delete it + batch.delete::(key.clone())?; + continue; + } + + // This shard has indices below the threshold + if first_block_to_remove <= highest { + // Boundary shard: filter and rewrite as sentinel + batch.delete::(key.clone())?; + let filtered: Vec = + list.iter().take_while(|block| *block < first_block_to_remove).collect(); + if !filtered.is_empty() { + reinsert = Some(filtered); + } + } + // Earlier shards are entirely below threshold - keep them unchanged + break; + } + + if let Some(indices) = reinsert { + let sentinel_key = mk_sentinel_key(&pk); + let new_list = BlockNumberList::new_pre_sorted(indices); + batch.put::(sentinel_key, &new_list)?; } } @@ -535,7 +569,7 @@ where /// Unwind storage history indices using `RocksDB`. /// /// Takes a set of affected (address, `storage_key`) pairs and removes all block numbers -/// >= `first_block_to_remove` from the history indices. +/// >= `first_block_to_remove` from the history indices across ALL shards. #[cfg(all(unix, feature = "rocksdb"))] pub(crate) fn unwind_storage_history_via_rocksdb( provider: &Provider, @@ -547,18 +581,23 @@ where { use reth_db_api::models::storage_sharded_key::StorageShardedKey; - unwind_history_via_rocksdb_generic::<_, reth_db_api::tables::StoragesHistory, _, _>( + unwind_history_via_rocksdb_generic::<_, reth_db_api::tables::StoragesHistory, _, _, _>( provider, affected_keys, first_block_to_remove, - |(address, storage_key)| StorageShardedKey::new(address, storage_key, u64::MAX), + |(address, storage_key)| StorageShardedKey::new(*address, *storage_key, 0), + |(address, storage_key)| StorageShardedKey::new(*address, *storage_key, u64::MAX), + |storage_sharded_key, (address, storage_key)| { + storage_sharded_key.address == *address && + storage_sharded_key.sharded_key.key == *storage_key + }, ) } /// Unwind account history indices using `RocksDB`. /// /// Takes a set of affected addresses and removes all block numbers -/// >= `first_block_to_remove` from the history indices. +/// >= `first_block_to_remove` from the history indices across ALL shards. #[cfg(all(unix, feature = "rocksdb"))] pub(crate) fn unwind_account_history_via_rocksdb( provider: &Provider, @@ -570,11 +609,13 @@ where { use reth_db_api::models::ShardedKey; - unwind_history_via_rocksdb_generic::<_, reth_db_api::tables::AccountsHistory, _, _>( + unwind_history_via_rocksdb_generic::<_, reth_db_api::tables::AccountsHistory, _, _, _>( provider, affected_addresses, first_block_to_remove, - |address| ShardedKey::new(address, u64::MAX), + |address| ShardedKey::new(*address, 0), + |address| ShardedKey::new(*address, u64::MAX), + |sharded_key, address| sharded_key.key == *address, ) }