From 89be91de0ef52b7c4fbdaf8b2dc48b42214b6484 Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Wed, 4 Feb 2026 06:48:22 +0000 Subject: [PATCH] perf(pruner): do not create an iterator_cf for every address (#21767) Co-authored-by: yongkangc --- .../src/segments/user/account_history.rs | 19 +- .../src/segments/user/storage_history.rs | 21 +- crates/storage/provider/src/lib.rs | 2 +- crates/storage/provider/src/providers/mod.rs | 4 +- .../provider/src/providers/rocksdb/mod.rs | 4 +- .../src/providers/rocksdb/provider.rs | 434 ++++++++++++++++++ .../provider/src/providers/rocksdb_stub.rs | 11 + 7 files changed, 470 insertions(+), 25 deletions(-) diff --git a/crates/prune/prune/src/segments/user/account_history.rs b/crates/prune/prune/src/segments/user/account_history.rs index 216f0d6c0c..e3ab12f9a5 100644 --- a/crates/prune/prune/src/segments/user/account_history.rs +++ b/crates/prune/prune/src/segments/user/account_history.rs @@ -238,8 +238,6 @@ impl AccountHistory { where Provider: DBProvider + StaticFileProviderFactory + ChangeSetReader + RocksDBProviderFactory, { - use reth_provider::PruneShardOutcome; - // Unlike MDBX path, we don't divide the limit by 2 because RocksDB path only prunes // history shards (no separate changeset table to delete from). The changesets are in // static files which are deleted separately. @@ -287,14 +285,15 @@ impl AccountHistory { sorted_accounts.sort_unstable_by_key(|(addr, _)| *addr); provider.with_rocksdb_batch(|mut batch| { - for (address, highest_block) in &sorted_accounts { - let prune_to = (*highest_block).min(last_changeset_pruned_block); - match batch.prune_account_history_to(*address, prune_to)? { - PruneShardOutcome::Deleted => deleted_shards += 1, - PruneShardOutcome::Updated => updated_shards += 1, - PruneShardOutcome::Unchanged => {} - } - } + let targets: Vec<_> = sorted_accounts + .iter() + .map(|(addr, highest)| (*addr, (*highest).min(last_changeset_pruned_block))) + .collect(); + + let outcomes = batch.prune_account_history_batch(&targets)?; + deleted_shards = outcomes.deleted; + updated_shards = outcomes.updated; + Ok(((), Some(batch.into_inner()))) })?; trace!(target: "pruner", deleted = deleted_shards, updated = updated_shards, %done, "Pruned account history (RocksDB indices)"); diff --git a/crates/prune/prune/src/segments/user/storage_history.rs b/crates/prune/prune/src/segments/user/storage_history.rs index 47c5ab1364..780ed51629 100644 --- a/crates/prune/prune/src/segments/user/storage_history.rs +++ b/crates/prune/prune/src/segments/user/storage_history.rs @@ -242,8 +242,6 @@ impl StorageHistory { where Provider: DBProvider + StaticFileProviderFactory + RocksDBProviderFactory, { - use reth_provider::PruneShardOutcome; - let mut limiter = input.limiter; if limiter.is_limit_reached() { @@ -291,14 +289,17 @@ impl StorageHistory { sorted_storages.sort_unstable_by_key(|((addr, key), _)| (*addr, *key)); provider.with_rocksdb_batch(|mut batch| { - for ((address, storage_key), highest_block) in &sorted_storages { - let prune_to = (*highest_block).min(last_changeset_pruned_block); - match batch.prune_storage_history_to(*address, *storage_key, prune_to)? { - PruneShardOutcome::Deleted => deleted_shards += 1, - PruneShardOutcome::Updated => updated_shards += 1, - PruneShardOutcome::Unchanged => {} - } - } + let targets: Vec<_> = sorted_storages + .iter() + .map(|((addr, key), highest)| { + ((*addr, *key), (*highest).min(last_changeset_pruned_block)) + }) + .collect(); + + let outcomes = batch.prune_storage_history_batch(&targets)?; + deleted_shards = outcomes.deleted; + updated_shards = outcomes.updated; + Ok(((), Some(batch.into_inner()))) })?; diff --git a/crates/storage/provider/src/lib.rs b/crates/storage/provider/src/lib.rs index f6a24925e1..6f3f2295f8 100644 --- a/crates/storage/provider/src/lib.rs +++ b/crates/storage/provider/src/lib.rs @@ -21,7 +21,7 @@ pub mod providers; pub use providers::{ DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, HistoricalStateProvider, HistoricalStateProviderRef, LatestStateProvider, LatestStateProviderRef, ProviderFactory, - PruneShardOutcome, SaveBlocksMode, StaticFileAccess, StaticFileProviderBuilder, + PruneShardOutcome, PrunedIndices, SaveBlocksMode, StaticFileAccess, StaticFileProviderBuilder, StaticFileWriteCtx, StaticFileWriter, }; diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index bf32d02e3c..9b8af6de66 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -39,8 +39,8 @@ pub use consistent::ConsistentProvider; pub(crate) mod rocksdb; pub use rocksdb::{ - PruneShardOutcome, RocksDBBatch, RocksDBBuilder, RocksDBIter, RocksDBProvider, RocksDBRawIter, - RocksDBStats, RocksDBTableStats, RocksTx, + PruneShardOutcome, PrunedIndices, RocksDBBatch, RocksDBBuilder, RocksDBIter, RocksDBProvider, + RocksDBRawIter, RocksDBStats, RocksDBTableStats, RocksTx, }; /// Helper trait to bound [`NodeTypes`] so that combined with database they satisfy diff --git a/crates/storage/provider/src/providers/rocksdb/mod.rs b/crates/storage/provider/src/providers/rocksdb/mod.rs index 4820cd0742..6c734ab4ff 100644 --- a/crates/storage/provider/src/providers/rocksdb/mod.rs +++ b/crates/storage/provider/src/providers/rocksdb/mod.rs @@ -6,6 +6,6 @@ mod provider; pub(crate) use provider::{PendingRocksDBBatches, RocksDBWriteCtx}; pub use provider::{ - PruneShardOutcome, RocksDBBatch, RocksDBBuilder, RocksDBIter, RocksDBProvider, RocksDBRawIter, - RocksDBStats, RocksDBTableStats, RocksTx, + PruneShardOutcome, PrunedIndices, RocksDBBatch, RocksDBBuilder, RocksDBIter, RocksDBProvider, + RocksDBRawIter, RocksDBStats, RocksDBTableStats, RocksTx, }; diff --git a/crates/storage/provider/src/providers/rocksdb/provider.rs b/crates/storage/provider/src/providers/rocksdb/provider.rs index a7ed4ed8be..38bd5ae282 100644 --- a/crates/storage/provider/src/providers/rocksdb/provider.rs +++ b/crates/storage/provider/src/providers/rocksdb/provider.rs @@ -474,6 +474,17 @@ impl RocksDBProviderInner { } } + /// Returns a raw iterator over a column family. + /// + /// Unlike [`Self::iterator_cf`], raw iterators support `seek()` for efficient + /// repositioning without creating a new iterator. + fn raw_iterator_cf(&self, cf: &rocksdb::ColumnFamily) -> RocksDBRawIterEnum<'_> { + match self { + Self::ReadWrite { db, .. } => RocksDBRawIterEnum::ReadWrite(db.raw_iterator_cf(cf)), + Self::ReadOnly { db, .. } => RocksDBRawIterEnum::ReadOnly(db.raw_iterator_cf(cf)), + } + } + /// Returns the path to the database directory. fn path(&self) -> &Path { match self { @@ -1317,6 +1328,17 @@ pub enum PruneShardOutcome { Unchanged, } +/// Tracks pruning outcomes for batch operations. +#[derive(Debug, Default, Clone, Copy)] +pub struct PrunedIndices { + /// Number of shards completely deleted. + pub deleted: usize, + /// Number of shards that were updated (filtered but still have entries). + pub updated: usize, + /// Number of shards that were unchanged. + pub unchanged: usize, +} + /// Handle for building a batch of operations atomically. /// /// Uses `WriteBatchWithTransaction` for atomic writes without full transaction overhead. @@ -1743,6 +1765,109 @@ impl<'a> RocksDBBatch<'a> { ) } + /// Prunes account history for multiple addresses in a single iterator pass. + /// + /// This is more efficient than calling [`Self::prune_account_history_to`] repeatedly + /// because it reuses a single raw iterator and skips seeks when the iterator is already + /// positioned correctly (which happens when targets are sorted and adjacent in key order). + /// + /// `targets` MUST be sorted by address for correctness and optimal performance + /// (matches on-disk key order). + pub fn prune_account_history_batch( + &mut self, + targets: &[(Address, BlockNumber)], + ) -> ProviderResult { + if targets.is_empty() { + return Ok(PrunedIndices::default()); + } + + debug_assert!( + targets.windows(2).all(|w| w[0].0 <= w[1].0), + "prune_account_history_batch: targets must be sorted by address" + ); + + // ShardedKey
layout: [address: 20][block: 8] = 28 bytes + // The first 20 bytes are the "prefix" that identifies the address + const PREFIX_LEN: usize = 20; + + let cf = self.provider.get_cf_handle::()?; + let mut iter = self.provider.0.raw_iterator_cf(cf); + let mut outcomes = PrunedIndices::default(); + + for (address, to_block) in targets { + // Build the target prefix (first 20 bytes = address) + let start_key = ShardedKey::new(*address, 0u64).encode(); + let target_prefix = &start_key[..PREFIX_LEN]; + + // Check if we need to seek or if the iterator is already positioned correctly. + // After processing the previous target, the iterator is either: + // 1. Positioned at a key with a different prefix (we iterated past our shards) + // 2. Invalid (no more keys) + // If the current key's prefix >= our target prefix, we may be able to skip the seek. + let needs_seek = if iter.valid() { + if let Some(current_key) = iter.key() { + // If current key's prefix < target prefix, we need to seek forward + // If current key's prefix > target prefix, this target has no shards (skip) + // If current key's prefix == target prefix, we're already positioned + current_key.get(..PREFIX_LEN).is_none_or(|p| p < target_prefix) + } else { + true + } + } else { + true + }; + + if needs_seek { + iter.seek(start_key); + iter.status().map_err(|e| { + ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo { + message: e.to_string().into(), + code: -1, + })) + })?; + } + + // Collect all shards for this address using raw prefix comparison + let mut shards = Vec::new(); + while iter.valid() { + let Some(key_bytes) = iter.key() else { break }; + + // Use raw prefix comparison instead of full decode for the prefix check + let current_prefix = key_bytes.get(..PREFIX_LEN); + if current_prefix != Some(target_prefix) { + break; + } + + // Now decode the full key (we need the block number) + let key = ShardedKey::
::decode(key_bytes) + .map_err(|_| ProviderError::Database(DatabaseError::Decode))?; + + let Some(value_bytes) = iter.value() else { break }; + let value = BlockNumberList::decompress(value_bytes) + .map_err(|_| ProviderError::Database(DatabaseError::Decode))?; + + shards.push((key, value)); + iter.next(); + } + + match self.prune_history_shards_inner( + shards, + *to_block, + |key| key.highest_block_number, + |key| key.highest_block_number == u64::MAX, + |batch, key| batch.delete::(key), + |batch, key, value| batch.put::(key, value), + || ShardedKey::new(*address, u64::MAX), + )? { + PruneShardOutcome::Deleted => outcomes.deleted += 1, + PruneShardOutcome::Updated => outcomes.updated += 1, + PruneShardOutcome::Unchanged => outcomes.unchanged += 1, + } + } + + Ok(outcomes) + } + /// Prunes storage history for the given address and storage key, removing blocks <= /// `to_block`. /// @@ -1766,6 +1891,111 @@ impl<'a> RocksDBBatch<'a> { ) } + /// Prunes storage history for multiple (address, `storage_key`) pairs in a single iterator + /// pass. + /// + /// This is more efficient than calling [`Self::prune_storage_history_to`] repeatedly + /// because it reuses a single raw iterator and skips seeks when the iterator is already + /// positioned correctly (which happens when targets are sorted and adjacent in key order). + /// + /// `targets` MUST be sorted by (address, `storage_key`) for correctness and optimal + /// performance (matches on-disk key order). + pub fn prune_storage_history_batch( + &mut self, + targets: &[((Address, B256), BlockNumber)], + ) -> ProviderResult { + if targets.is_empty() { + return Ok(PrunedIndices::default()); + } + + debug_assert!( + targets.windows(2).all(|w| w[0].0 <= w[1].0), + "prune_storage_history_batch: targets must be sorted by (address, storage_key)" + ); + + // StorageShardedKey layout: [address: 20][storage_key: 32][block: 8] = 60 bytes + // The first 52 bytes are the "prefix" that identifies (address, storage_key) + const PREFIX_LEN: usize = 52; + + let cf = self.provider.get_cf_handle::()?; + let mut iter = self.provider.0.raw_iterator_cf(cf); + let mut outcomes = PrunedIndices::default(); + + for ((address, storage_key), to_block) in targets { + // Build the target prefix (first 52 bytes of encoded key) + let start_key = StorageShardedKey::new(*address, *storage_key, 0u64).encode(); + let target_prefix = &start_key[..PREFIX_LEN]; + + // Check if we need to seek or if the iterator is already positioned correctly. + // After processing the previous target, the iterator is either: + // 1. Positioned at a key with a different prefix (we iterated past our shards) + // 2. Invalid (no more keys) + // If the current key's prefix >= our target prefix, we may be able to skip the seek. + let needs_seek = if iter.valid() { + if let Some(current_key) = iter.key() { + // If current key's prefix < target prefix, we need to seek forward + // If current key's prefix > target prefix, this target has no shards (skip) + // If current key's prefix == target prefix, we're already positioned + current_key.get(..PREFIX_LEN).is_none_or(|p| p < target_prefix) + } else { + true + } + } else { + true + }; + + if needs_seek { + iter.seek(start_key); + iter.status().map_err(|e| { + ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo { + message: e.to_string().into(), + code: -1, + })) + })?; + } + + // Collect all shards for this (address, storage_key) pair using prefix comparison + let mut shards = Vec::new(); + while iter.valid() { + let Some(key_bytes) = iter.key() else { break }; + + // Use raw prefix comparison instead of full decode for the prefix check + let current_prefix = key_bytes.get(..PREFIX_LEN); + if current_prefix != Some(target_prefix) { + break; + } + + // Now decode the full key (we need the block number) + let key = StorageShardedKey::decode(key_bytes) + .map_err(|_| ProviderError::Database(DatabaseError::Decode))?; + + let Some(value_bytes) = iter.value() else { break }; + let value = BlockNumberList::decompress(value_bytes) + .map_err(|_| ProviderError::Database(DatabaseError::Decode))?; + + shards.push((key, value)); + iter.next(); + } + + // Use existing prune_history_shards_inner logic + match self.prune_history_shards_inner( + shards, + *to_block, + |key| key.sharded_key.highest_block_number, + |key| key.sharded_key.highest_block_number == u64::MAX, + |batch, key| batch.delete::(key), + |batch, key, value| batch.put::(key, value), + || StorageShardedKey::last(*address, *storage_key), + )? { + PruneShardOutcome::Deleted => outcomes.deleted += 1, + PruneShardOutcome::Updated => outcomes.updated += 1, + PruneShardOutcome::Unchanged => outcomes.unchanged += 1, + } + } + + Ok(outcomes) + } + /// Unwinds storage history to keep only blocks `<= keep_to`. /// /// Handles multi-shard scenarios by: @@ -2157,6 +2387,67 @@ impl Iterator for RocksDBIterEnum<'_> { } } +/// Wrapper enum for raw `RocksDB` iterators that works in both read-write and read-only modes. +/// +/// Unlike [`RocksDBIterEnum`], raw iterators expose `seek()` for efficient repositioning +/// without reinitializing the iterator. +enum RocksDBRawIterEnum<'db> { + /// Raw iterator from read-write `OptimisticTransactionDB`. + ReadWrite(DBRawIteratorWithThreadMode<'db, OptimisticTransactionDB>), + /// Raw iterator from read-only `DB`. + ReadOnly(DBRawIteratorWithThreadMode<'db, DB>), +} + +impl RocksDBRawIterEnum<'_> { + /// Positions the iterator at the first key >= `key`. + fn seek(&mut self, key: impl AsRef<[u8]>) { + match self { + Self::ReadWrite(iter) => iter.seek(key), + Self::ReadOnly(iter) => iter.seek(key), + } + } + + /// Returns true if the iterator is positioned at a valid key-value pair. + fn valid(&self) -> bool { + match self { + Self::ReadWrite(iter) => iter.valid(), + Self::ReadOnly(iter) => iter.valid(), + } + } + + /// Returns the current key, if valid. + fn key(&self) -> Option<&[u8]> { + match self { + Self::ReadWrite(iter) => iter.key(), + Self::ReadOnly(iter) => iter.key(), + } + } + + /// Returns the current value, if valid. + fn value(&self) -> Option<&[u8]> { + match self { + Self::ReadWrite(iter) => iter.value(), + Self::ReadOnly(iter) => iter.value(), + } + } + + /// Advances the iterator to the next key. + fn next(&mut self) { + match self { + Self::ReadWrite(iter) => iter.next(), + Self::ReadOnly(iter) => iter.next(), + } + } + + /// Returns the status of the iterator. + fn status(&self) -> Result<(), rocksdb::Error> { + match self { + Self::ReadWrite(iter) => iter.status(), + Self::ReadOnly(iter) => iter.status(), + } + } +} + /// Iterator over a `RocksDB` table (non-transactional). /// /// Yields decoded `(Key, Value)` pairs in key order. @@ -3584,4 +3875,147 @@ mod tests { } } } + + #[test] + fn test_prune_account_history_batch_multiple_sorted_targets() { + let temp_dir = TempDir::new().unwrap(); + let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap(); + + let addr1 = Address::from([0x01; 20]); + let addr2 = Address::from([0x02; 20]); + let addr3 = Address::from([0x03; 20]); + + // Setup shards for each address + let mut batch = provider.batch(); + batch + .put::( + ShardedKey::new(addr1, u64::MAX), + &BlockNumberList::new_pre_sorted([10, 20, 30]), + ) + .unwrap(); + batch + .put::( + ShardedKey::new(addr2, u64::MAX), + &BlockNumberList::new_pre_sorted([5, 10, 15]), + ) + .unwrap(); + batch + .put::( + ShardedKey::new(addr3, u64::MAX), + &BlockNumberList::new_pre_sorted([100, 200]), + ) + .unwrap(); + batch.commit().unwrap(); + + // Prune all three (sorted by address) + let mut targets = vec![(addr1, 15), (addr2, 10), (addr3, 50)]; + targets.sort_by_key(|(addr, _)| *addr); + + let mut batch = provider.batch(); + let outcomes = batch.prune_account_history_batch(&targets).unwrap(); + batch.commit().unwrap(); + + // addr1: prune <=15, keep [20, 30] -> updated + // addr2: prune <=10, keep [15] -> updated + // addr3: prune <=50, keep [100, 200] -> unchanged + assert_eq!(outcomes.updated, 2); + assert_eq!(outcomes.unchanged, 1); + + let shards1 = provider.account_history_shards(addr1).unwrap(); + assert_eq!(shards1[0].1.iter().collect::>(), vec![20, 30]); + + let shards2 = provider.account_history_shards(addr2).unwrap(); + assert_eq!(shards2[0].1.iter().collect::>(), vec![15]); + + let shards3 = provider.account_history_shards(addr3).unwrap(); + assert_eq!(shards3[0].1.iter().collect::>(), vec![100, 200]); + } + + #[test] + fn test_prune_account_history_batch_target_with_no_shards() { + let temp_dir = TempDir::new().unwrap(); + let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap(); + + let addr1 = Address::from([0x01; 20]); + let addr2 = Address::from([0x02; 20]); // No shards for this one + let addr3 = Address::from([0x03; 20]); + + // Only setup shards for addr1 and addr3 + let mut batch = provider.batch(); + batch + .put::( + ShardedKey::new(addr1, u64::MAX), + &BlockNumberList::new_pre_sorted([10, 20]), + ) + .unwrap(); + batch + .put::( + ShardedKey::new(addr3, u64::MAX), + &BlockNumberList::new_pre_sorted([30, 40]), + ) + .unwrap(); + batch.commit().unwrap(); + + // Prune all three (addr2 has no shards - tests p > target_prefix case) + let mut targets = vec![(addr1, 15), (addr2, 100), (addr3, 35)]; + targets.sort_by_key(|(addr, _)| *addr); + + let mut batch = provider.batch(); + let outcomes = batch.prune_account_history_batch(&targets).unwrap(); + batch.commit().unwrap(); + + // addr1: updated (keep [20]) + // addr2: unchanged (no shards) + // addr3: updated (keep [40]) + assert_eq!(outcomes.updated, 2); + assert_eq!(outcomes.unchanged, 1); + + let shards1 = provider.account_history_shards(addr1).unwrap(); + assert_eq!(shards1[0].1.iter().collect::>(), vec![20]); + + let shards3 = provider.account_history_shards(addr3).unwrap(); + assert_eq!(shards3[0].1.iter().collect::>(), vec![40]); + } + + #[test] + fn test_prune_storage_history_batch_multiple_sorted_targets() { + let temp_dir = TempDir::new().unwrap(); + let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap(); + + let addr = Address::from([0x42; 20]); + let slot1 = B256::from([0x01; 32]); + let slot2 = B256::from([0x02; 32]); + + // Setup shards + let mut batch = provider.batch(); + batch + .put::( + StorageShardedKey::new(addr, slot1, u64::MAX), + &BlockNumberList::new_pre_sorted([10, 20, 30]), + ) + .unwrap(); + batch + .put::( + StorageShardedKey::new(addr, slot2, u64::MAX), + &BlockNumberList::new_pre_sorted([5, 15, 25]), + ) + .unwrap(); + batch.commit().unwrap(); + + // Prune both (sorted) + let mut targets = vec![((addr, slot1), 15), ((addr, slot2), 10)]; + targets.sort_by_key(|((a, s), _)| (*a, *s)); + + let mut batch = provider.batch(); + let outcomes = batch.prune_storage_history_batch(&targets).unwrap(); + batch.commit().unwrap(); + + assert_eq!(outcomes.updated, 2); + + let shards1 = provider.storage_history_shards(addr, slot1).unwrap(); + assert_eq!(shards1[0].1.iter().collect::>(), vec![20, 30]); + + let shards2 = provider.storage_history_shards(addr, slot2).unwrap(); + assert_eq!(shards2[0].1.iter().collect::>(), vec![15, 25]); + } } diff --git a/crates/storage/provider/src/providers/rocksdb_stub.rs b/crates/storage/provider/src/providers/rocksdb_stub.rs index 93e79ed320..ff84f1b17e 100644 --- a/crates/storage/provider/src/providers/rocksdb_stub.rs +++ b/crates/storage/provider/src/providers/rocksdb_stub.rs @@ -219,3 +219,14 @@ pub enum PruneShardOutcome { /// Shard was unchanged (no blocks <= `to_block`). Unchanged, } + +/// Tracks pruning outcomes for batch operations (stub). +#[derive(Debug, Default, Clone, Copy)] +pub struct PrunedIndices { + /// Number of shards completely deleted. + pub deleted: usize, + /// Number of shards that were updated (filtered but still have entries). + pub updated: usize, + /// Number of shards that were unchanged. + pub unchanged: usize, +}