perf(pruner): do not create an iterator_cf for every address (#21767)

Co-authored-by: yongkangc <chiayongkang@hotmail.com>
This commit is contained in:
Dan Cline
2026-02-04 06:48:22 +00:00
committed by GitHub
parent 3af5a4a4e2
commit 89be91de0e
7 changed files with 470 additions and 25 deletions

View File

@@ -238,8 +238,6 @@ impl AccountHistory {
where
Provider: DBProvider + StaticFileProviderFactory + ChangeSetReader + RocksDBProviderFactory,
{
use reth_provider::PruneShardOutcome;
// Unlike MDBX path, we don't divide the limit by 2 because RocksDB path only prunes
// history shards (no separate changeset table to delete from). The changesets are in
// static files which are deleted separately.
@@ -287,14 +285,15 @@ impl AccountHistory {
sorted_accounts.sort_unstable_by_key(|(addr, _)| *addr);
provider.with_rocksdb_batch(|mut batch| {
for (address, highest_block) in &sorted_accounts {
let prune_to = (*highest_block).min(last_changeset_pruned_block);
match batch.prune_account_history_to(*address, prune_to)? {
PruneShardOutcome::Deleted => deleted_shards += 1,
PruneShardOutcome::Updated => updated_shards += 1,
PruneShardOutcome::Unchanged => {}
}
}
let targets: Vec<_> = sorted_accounts
.iter()
.map(|(addr, highest)| (*addr, (*highest).min(last_changeset_pruned_block)))
.collect();
let outcomes = batch.prune_account_history_batch(&targets)?;
deleted_shards = outcomes.deleted;
updated_shards = outcomes.updated;
Ok(((), Some(batch.into_inner())))
})?;
trace!(target: "pruner", deleted = deleted_shards, updated = updated_shards, %done, "Pruned account history (RocksDB indices)");

View File

@@ -242,8 +242,6 @@ impl StorageHistory {
where
Provider: DBProvider + StaticFileProviderFactory + RocksDBProviderFactory,
{
use reth_provider::PruneShardOutcome;
let mut limiter = input.limiter;
if limiter.is_limit_reached() {
@@ -291,14 +289,17 @@ impl StorageHistory {
sorted_storages.sort_unstable_by_key(|((addr, key), _)| (*addr, *key));
provider.with_rocksdb_batch(|mut batch| {
for ((address, storage_key), highest_block) in &sorted_storages {
let prune_to = (*highest_block).min(last_changeset_pruned_block);
match batch.prune_storage_history_to(*address, *storage_key, prune_to)? {
PruneShardOutcome::Deleted => deleted_shards += 1,
PruneShardOutcome::Updated => updated_shards += 1,
PruneShardOutcome::Unchanged => {}
}
}
let targets: Vec<_> = sorted_storages
.iter()
.map(|((addr, key), highest)| {
((*addr, *key), (*highest).min(last_changeset_pruned_block))
})
.collect();
let outcomes = batch.prune_storage_history_batch(&targets)?;
deleted_shards = outcomes.deleted;
updated_shards = outcomes.updated;
Ok(((), Some(batch.into_inner())))
})?;

View File

@@ -21,7 +21,7 @@ pub mod providers;
pub use providers::{
DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, HistoricalStateProvider,
HistoricalStateProviderRef, LatestStateProvider, LatestStateProviderRef, ProviderFactory,
PruneShardOutcome, SaveBlocksMode, StaticFileAccess, StaticFileProviderBuilder,
PruneShardOutcome, PrunedIndices, SaveBlocksMode, StaticFileAccess, StaticFileProviderBuilder,
StaticFileWriteCtx, StaticFileWriter,
};

View File

@@ -39,8 +39,8 @@ pub use consistent::ConsistentProvider;
pub(crate) mod rocksdb;
pub use rocksdb::{
PruneShardOutcome, RocksDBBatch, RocksDBBuilder, RocksDBIter, RocksDBProvider, RocksDBRawIter,
RocksDBStats, RocksDBTableStats, RocksTx,
PruneShardOutcome, PrunedIndices, RocksDBBatch, RocksDBBuilder, RocksDBIter, RocksDBProvider,
RocksDBRawIter, RocksDBStats, RocksDBTableStats, RocksTx,
};
/// Helper trait to bound [`NodeTypes`] so that combined with database they satisfy

View File

@@ -6,6 +6,6 @@ mod provider;
pub(crate) use provider::{PendingRocksDBBatches, RocksDBWriteCtx};
pub use provider::{
PruneShardOutcome, RocksDBBatch, RocksDBBuilder, RocksDBIter, RocksDBProvider, RocksDBRawIter,
RocksDBStats, RocksDBTableStats, RocksTx,
PruneShardOutcome, PrunedIndices, RocksDBBatch, RocksDBBuilder, RocksDBIter, RocksDBProvider,
RocksDBRawIter, RocksDBStats, RocksDBTableStats, RocksTx,
};

View File

@@ -474,6 +474,17 @@ impl RocksDBProviderInner {
}
}
/// Returns a raw iterator over a column family.
///
/// Unlike [`Self::iterator_cf`], raw iterators support `seek()` for efficient
/// repositioning without creating a new iterator.
fn raw_iterator_cf(&self, cf: &rocksdb::ColumnFamily) -> RocksDBRawIterEnum<'_> {
match self {
Self::ReadWrite { db, .. } => RocksDBRawIterEnum::ReadWrite(db.raw_iterator_cf(cf)),
Self::ReadOnly { db, .. } => RocksDBRawIterEnum::ReadOnly(db.raw_iterator_cf(cf)),
}
}
/// Returns the path to the database directory.
fn path(&self) -> &Path {
match self {
@@ -1317,6 +1328,17 @@ pub enum PruneShardOutcome {
Unchanged,
}
/// Tracks pruning outcomes for batch operations.
#[derive(Debug, Default, Clone, Copy)]
pub struct PrunedIndices {
/// Number of shards completely deleted.
pub deleted: usize,
/// Number of shards that were updated (filtered but still have entries).
pub updated: usize,
/// Number of shards that were unchanged.
pub unchanged: usize,
}
/// Handle for building a batch of operations atomically.
///
/// Uses `WriteBatchWithTransaction` for atomic writes without full transaction overhead.
@@ -1743,6 +1765,109 @@ impl<'a> RocksDBBatch<'a> {
)
}
/// Prunes account history for multiple addresses in a single iterator pass.
///
/// This is more efficient than calling [`Self::prune_account_history_to`] repeatedly
/// because it reuses a single raw iterator and skips seeks when the iterator is already
/// positioned correctly (which happens when targets are sorted and adjacent in key order).
///
/// `targets` MUST be sorted by address for correctness and optimal performance
/// (matches on-disk key order).
pub fn prune_account_history_batch(
&mut self,
targets: &[(Address, BlockNumber)],
) -> ProviderResult<PrunedIndices> {
if targets.is_empty() {
return Ok(PrunedIndices::default());
}
debug_assert!(
targets.windows(2).all(|w| w[0].0 <= w[1].0),
"prune_account_history_batch: targets must be sorted by address"
);
// ShardedKey<Address> layout: [address: 20][block: 8] = 28 bytes
// The first 20 bytes are the "prefix" that identifies the address
const PREFIX_LEN: usize = 20;
let cf = self.provider.get_cf_handle::<tables::AccountsHistory>()?;
let mut iter = self.provider.0.raw_iterator_cf(cf);
let mut outcomes = PrunedIndices::default();
for (address, to_block) in targets {
// Build the target prefix (first 20 bytes = address)
let start_key = ShardedKey::new(*address, 0u64).encode();
let target_prefix = &start_key[..PREFIX_LEN];
// Check if we need to seek or if the iterator is already positioned correctly.
// After processing the previous target, the iterator is either:
// 1. Positioned at a key with a different prefix (we iterated past our shards)
// 2. Invalid (no more keys)
// If the current key's prefix >= our target prefix, we may be able to skip the seek.
let needs_seek = if iter.valid() {
if let Some(current_key) = iter.key() {
// If current key's prefix < target prefix, we need to seek forward
// If current key's prefix > target prefix, this target has no shards (skip)
// If current key's prefix == target prefix, we're already positioned
current_key.get(..PREFIX_LEN).is_none_or(|p| p < target_prefix)
} else {
true
}
} else {
true
};
if needs_seek {
iter.seek(start_key);
iter.status().map_err(|e| {
ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
}))
})?;
}
// Collect all shards for this address using raw prefix comparison
let mut shards = Vec::new();
while iter.valid() {
let Some(key_bytes) = iter.key() else { break };
// Use raw prefix comparison instead of full decode for the prefix check
let current_prefix = key_bytes.get(..PREFIX_LEN);
if current_prefix != Some(target_prefix) {
break;
}
// Now decode the full key (we need the block number)
let key = ShardedKey::<Address>::decode(key_bytes)
.map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
let Some(value_bytes) = iter.value() else { break };
let value = BlockNumberList::decompress(value_bytes)
.map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
shards.push((key, value));
iter.next();
}
match self.prune_history_shards_inner(
shards,
*to_block,
|key| key.highest_block_number,
|key| key.highest_block_number == u64::MAX,
|batch, key| batch.delete::<tables::AccountsHistory>(key),
|batch, key, value| batch.put::<tables::AccountsHistory>(key, value),
|| ShardedKey::new(*address, u64::MAX),
)? {
PruneShardOutcome::Deleted => outcomes.deleted += 1,
PruneShardOutcome::Updated => outcomes.updated += 1,
PruneShardOutcome::Unchanged => outcomes.unchanged += 1,
}
}
Ok(outcomes)
}
/// Prunes storage history for the given address and storage key, removing blocks <=
/// `to_block`.
///
@@ -1766,6 +1891,111 @@ impl<'a> RocksDBBatch<'a> {
)
}
/// Prunes storage history for multiple (address, `storage_key`) pairs in a single iterator
/// pass.
///
/// This is more efficient than calling [`Self::prune_storage_history_to`] repeatedly
/// because it reuses a single raw iterator and skips seeks when the iterator is already
/// positioned correctly (which happens when targets are sorted and adjacent in key order).
///
/// `targets` MUST be sorted by (address, `storage_key`) for correctness and optimal
/// performance (matches on-disk key order).
pub fn prune_storage_history_batch(
&mut self,
targets: &[((Address, B256), BlockNumber)],
) -> ProviderResult<PrunedIndices> {
if targets.is_empty() {
return Ok(PrunedIndices::default());
}
debug_assert!(
targets.windows(2).all(|w| w[0].0 <= w[1].0),
"prune_storage_history_batch: targets must be sorted by (address, storage_key)"
);
// StorageShardedKey layout: [address: 20][storage_key: 32][block: 8] = 60 bytes
// The first 52 bytes are the "prefix" that identifies (address, storage_key)
const PREFIX_LEN: usize = 52;
let cf = self.provider.get_cf_handle::<tables::StoragesHistory>()?;
let mut iter = self.provider.0.raw_iterator_cf(cf);
let mut outcomes = PrunedIndices::default();
for ((address, storage_key), to_block) in targets {
// Build the target prefix (first 52 bytes of encoded key)
let start_key = StorageShardedKey::new(*address, *storage_key, 0u64).encode();
let target_prefix = &start_key[..PREFIX_LEN];
// Check if we need to seek or if the iterator is already positioned correctly.
// After processing the previous target, the iterator is either:
// 1. Positioned at a key with a different prefix (we iterated past our shards)
// 2. Invalid (no more keys)
// If the current key's prefix >= our target prefix, we may be able to skip the seek.
let needs_seek = if iter.valid() {
if let Some(current_key) = iter.key() {
// If current key's prefix < target prefix, we need to seek forward
// If current key's prefix > target prefix, this target has no shards (skip)
// If current key's prefix == target prefix, we're already positioned
current_key.get(..PREFIX_LEN).is_none_or(|p| p < target_prefix)
} else {
true
}
} else {
true
};
if needs_seek {
iter.seek(start_key);
iter.status().map_err(|e| {
ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
}))
})?;
}
// Collect all shards for this (address, storage_key) pair using prefix comparison
let mut shards = Vec::new();
while iter.valid() {
let Some(key_bytes) = iter.key() else { break };
// Use raw prefix comparison instead of full decode for the prefix check
let current_prefix = key_bytes.get(..PREFIX_LEN);
if current_prefix != Some(target_prefix) {
break;
}
// Now decode the full key (we need the block number)
let key = StorageShardedKey::decode(key_bytes)
.map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
let Some(value_bytes) = iter.value() else { break };
let value = BlockNumberList::decompress(value_bytes)
.map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
shards.push((key, value));
iter.next();
}
// Use existing prune_history_shards_inner logic
match self.prune_history_shards_inner(
shards,
*to_block,
|key| key.sharded_key.highest_block_number,
|key| key.sharded_key.highest_block_number == u64::MAX,
|batch, key| batch.delete::<tables::StoragesHistory>(key),
|batch, key, value| batch.put::<tables::StoragesHistory>(key, value),
|| StorageShardedKey::last(*address, *storage_key),
)? {
PruneShardOutcome::Deleted => outcomes.deleted += 1,
PruneShardOutcome::Updated => outcomes.updated += 1,
PruneShardOutcome::Unchanged => outcomes.unchanged += 1,
}
}
Ok(outcomes)
}
/// Unwinds storage history to keep only blocks `<= keep_to`.
///
/// Handles multi-shard scenarios by:
@@ -2157,6 +2387,67 @@ impl Iterator for RocksDBIterEnum<'_> {
}
}
/// Wrapper enum for raw `RocksDB` iterators that works in both read-write and read-only modes.
///
/// Unlike [`RocksDBIterEnum`], raw iterators expose `seek()` for efficient repositioning
/// without reinitializing the iterator.
enum RocksDBRawIterEnum<'db> {
/// Raw iterator from read-write `OptimisticTransactionDB`.
ReadWrite(DBRawIteratorWithThreadMode<'db, OptimisticTransactionDB>),
/// Raw iterator from read-only `DB`.
ReadOnly(DBRawIteratorWithThreadMode<'db, DB>),
}
impl RocksDBRawIterEnum<'_> {
/// Positions the iterator at the first key >= `key`.
fn seek(&mut self, key: impl AsRef<[u8]>) {
match self {
Self::ReadWrite(iter) => iter.seek(key),
Self::ReadOnly(iter) => iter.seek(key),
}
}
/// Returns true if the iterator is positioned at a valid key-value pair.
fn valid(&self) -> bool {
match self {
Self::ReadWrite(iter) => iter.valid(),
Self::ReadOnly(iter) => iter.valid(),
}
}
/// Returns the current key, if valid.
fn key(&self) -> Option<&[u8]> {
match self {
Self::ReadWrite(iter) => iter.key(),
Self::ReadOnly(iter) => iter.key(),
}
}
/// Returns the current value, if valid.
fn value(&self) -> Option<&[u8]> {
match self {
Self::ReadWrite(iter) => iter.value(),
Self::ReadOnly(iter) => iter.value(),
}
}
/// Advances the iterator to the next key.
fn next(&mut self) {
match self {
Self::ReadWrite(iter) => iter.next(),
Self::ReadOnly(iter) => iter.next(),
}
}
/// Returns the status of the iterator.
fn status(&self) -> Result<(), rocksdb::Error> {
match self {
Self::ReadWrite(iter) => iter.status(),
Self::ReadOnly(iter) => iter.status(),
}
}
}
/// Iterator over a `RocksDB` table (non-transactional).
///
/// Yields decoded `(Key, Value)` pairs in key order.
@@ -3584,4 +3875,147 @@ mod tests {
}
}
}
#[test]
fn test_prune_account_history_batch_multiple_sorted_targets() {
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let addr1 = Address::from([0x01; 20]);
let addr2 = Address::from([0x02; 20]);
let addr3 = Address::from([0x03; 20]);
// Setup shards for each address
let mut batch = provider.batch();
batch
.put::<tables::AccountsHistory>(
ShardedKey::new(addr1, u64::MAX),
&BlockNumberList::new_pre_sorted([10, 20, 30]),
)
.unwrap();
batch
.put::<tables::AccountsHistory>(
ShardedKey::new(addr2, u64::MAX),
&BlockNumberList::new_pre_sorted([5, 10, 15]),
)
.unwrap();
batch
.put::<tables::AccountsHistory>(
ShardedKey::new(addr3, u64::MAX),
&BlockNumberList::new_pre_sorted([100, 200]),
)
.unwrap();
batch.commit().unwrap();
// Prune all three (sorted by address)
let mut targets = vec![(addr1, 15), (addr2, 10), (addr3, 50)];
targets.sort_by_key(|(addr, _)| *addr);
let mut batch = provider.batch();
let outcomes = batch.prune_account_history_batch(&targets).unwrap();
batch.commit().unwrap();
// addr1: prune <=15, keep [20, 30] -> updated
// addr2: prune <=10, keep [15] -> updated
// addr3: prune <=50, keep [100, 200] -> unchanged
assert_eq!(outcomes.updated, 2);
assert_eq!(outcomes.unchanged, 1);
let shards1 = provider.account_history_shards(addr1).unwrap();
assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20, 30]);
let shards2 = provider.account_history_shards(addr2).unwrap();
assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![15]);
let shards3 = provider.account_history_shards(addr3).unwrap();
assert_eq!(shards3[0].1.iter().collect::<Vec<_>>(), vec![100, 200]);
}
#[test]
fn test_prune_account_history_batch_target_with_no_shards() {
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let addr1 = Address::from([0x01; 20]);
let addr2 = Address::from([0x02; 20]); // No shards for this one
let addr3 = Address::from([0x03; 20]);
// Only setup shards for addr1 and addr3
let mut batch = provider.batch();
batch
.put::<tables::AccountsHistory>(
ShardedKey::new(addr1, u64::MAX),
&BlockNumberList::new_pre_sorted([10, 20]),
)
.unwrap();
batch
.put::<tables::AccountsHistory>(
ShardedKey::new(addr3, u64::MAX),
&BlockNumberList::new_pre_sorted([30, 40]),
)
.unwrap();
batch.commit().unwrap();
// Prune all three (addr2 has no shards - tests p > target_prefix case)
let mut targets = vec![(addr1, 15), (addr2, 100), (addr3, 35)];
targets.sort_by_key(|(addr, _)| *addr);
let mut batch = provider.batch();
let outcomes = batch.prune_account_history_batch(&targets).unwrap();
batch.commit().unwrap();
// addr1: updated (keep [20])
// addr2: unchanged (no shards)
// addr3: updated (keep [40])
assert_eq!(outcomes.updated, 2);
assert_eq!(outcomes.unchanged, 1);
let shards1 = provider.account_history_shards(addr1).unwrap();
assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20]);
let shards3 = provider.account_history_shards(addr3).unwrap();
assert_eq!(shards3[0].1.iter().collect::<Vec<_>>(), vec![40]);
}
#[test]
fn test_prune_storage_history_batch_multiple_sorted_targets() {
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let addr = Address::from([0x42; 20]);
let slot1 = B256::from([0x01; 32]);
let slot2 = B256::from([0x02; 32]);
// Setup shards
let mut batch = provider.batch();
batch
.put::<tables::StoragesHistory>(
StorageShardedKey::new(addr, slot1, u64::MAX),
&BlockNumberList::new_pre_sorted([10, 20, 30]),
)
.unwrap();
batch
.put::<tables::StoragesHistory>(
StorageShardedKey::new(addr, slot2, u64::MAX),
&BlockNumberList::new_pre_sorted([5, 15, 25]),
)
.unwrap();
batch.commit().unwrap();
// Prune both (sorted)
let mut targets = vec![((addr, slot1), 15), ((addr, slot2), 10)];
targets.sort_by_key(|((a, s), _)| (*a, *s));
let mut batch = provider.batch();
let outcomes = batch.prune_storage_history_batch(&targets).unwrap();
batch.commit().unwrap();
assert_eq!(outcomes.updated, 2);
let shards1 = provider.storage_history_shards(addr, slot1).unwrap();
assert_eq!(shards1[0].1.iter().collect::<Vec<_>>(), vec![20, 30]);
let shards2 = provider.storage_history_shards(addr, slot2).unwrap();
assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![15, 25]);
}
}

View File

@@ -219,3 +219,14 @@ pub enum PruneShardOutcome {
/// Shard was unchanged (no blocks <= `to_block`).
Unchanged,
}
/// Tracks pruning outcomes for batch operations (stub).
#[derive(Debug, Default, Clone, Copy)]
pub struct PrunedIndices {
/// Number of shards completely deleted.
pub deleted: usize,
/// Number of shards that were updated (filtered but still have entries).
pub updated: usize,
/// Number of shards that were unchanged.
pub unchanged: usize,
}