Compare commits

...

6 Commits

Author SHA1 Message Date
yongkangc
903c58749d refactor(rocksdb): extract shared unwind_history_shards_inner helper
Deduplicates the unwind shard processing logic between account and
storage history. Both unwind_account_history_to/batch and
unwind_storage_history_to/batch now delegate to a single generic
unwind_history_shards_inner method using closures, following the
same pattern as prune_history_shards_inner.

Amp-Thread-ID: https://ampcode.com/threads/T-019c436b-33ce-739d-8fa2-69ea27d4fc4a
2026-02-09 17:33:18 +00:00
yongkangc
d1dfce17ab docs: restore full doc comments for RocksDBRawIterEnum and decode_iter_item
Amp-Thread-ID: https://ampcode.com/threads/T-019c28b3-097d-741f-8a27-9af9a4f1ada1
2026-02-04 12:50:28 +00:00
yongkangc
abe603b94c docs: restore detailed flush_and_compact doc comment
Amp-Thread-ID: https://ampcode.com/threads/T-019c27a1-4820-715a-9b42-18fa2ce05c98
2026-02-04 08:11:28 +00:00
yongkangc
9258bcdc7b fix: check iter.status() after iteration loops
Amp-Thread-ID: https://ampcode.com/threads/T-019c27a1-4820-715a-9b42-18fa2ce05c98
2026-02-04 08:09:33 +00:00
yongkangc
7134ae4ec7 chore: fix clippy warnings and clean up comments
Amp-Thread-ID: https://ampcode.com/threads/T-019c2769-c7e2-7217-ae23-b189345c2c4b
2026-02-04 07:50:25 +00:00
yongkangc
d9f81bc104 perf(rocksdb): use single iterator for batch history unwind
Previously, unwind_account_history_indices and unwind_storage_history_indices
created a new iterator_cf for every address/storage key by calling
account_history_shards/storage_history_shards in a loop. This is the same
performance antipattern fixed in PR #21767 for pruning.

This commit adds:
- unwind_account_history_batch: batch version using single raw_iterator_cf
- unwind_storage_history_batch: batch version using single raw_iterator_cf
- RocksDBRawIterEnum: wrapper supporting seek() for iterator reuse

The batch methods reuse a single raw iterator and skip seeks when the
iterator is already positioned correctly (for sorted targets in key order).
This significantly reduces RocksDB seek overhead for large unwind operations.

Amp-Thread-ID: https://ampcode.com/threads/T-019c272c-eed5-751b-a0c4-8e260f96a3bc
2026-02-04 07:50:12 +00:00

View File

@@ -1098,13 +1098,14 @@ impl RocksDBProvider {
.or_insert(block_number);
}
let mut targets: Vec<_> = address_min_block
.into_iter()
.map(|(address, min_block)| (address, min_block.checked_sub(1)))
.collect();
targets.sort_unstable_by_key(|(addr, _)| *addr);
let mut batch = self.batch();
for (address, min_block) in address_min_block {
match min_block.checked_sub(1) {
Some(keep_to) => batch.unwind_account_history_to(address, keep_to)?,
None => batch.clear_account_history(address)?,
}
}
batch.unwind_account_history_batch(&targets)?;
Ok(batch.into_inner())
}
@@ -1129,13 +1130,14 @@ impl RocksDBProvider {
.or_insert(block_number);
}
let mut targets: Vec<_> = key_min_block
.into_iter()
.map(|(key, min_block)| (key, min_block.checked_sub(1)))
.collect();
targets.sort_unstable_by_key(|(key, _)| *key);
let mut batch = self.batch();
for ((address, storage_key), min_block) in key_min_block {
match min_block.checked_sub(1) {
Some(keep_to) => batch.unwind_storage_history_to(address, storage_key, keep_to)?,
None => batch.clear_storage_history(address, storage_key)?,
}
}
batch.unwind_storage_history_batch(&targets)?;
Ok(batch.into_inner())
}
@@ -1616,64 +1618,281 @@ impl<'a> RocksDBBatch<'a> {
keep_to: BlockNumber,
) -> ProviderResult<()> {
let shards = self.provider.account_history_shards(address)?;
self.unwind_history_shards_inner(
shards,
keep_to,
|key| key.highest_block_number,
|key| key.highest_block_number == u64::MAX,
|batch, key| batch.delete::<tables::AccountsHistory>(key),
|batch, key, value| batch.put::<tables::AccountsHistory>(key, value),
|| ShardedKey::new(address, u64::MAX),
)
}
/// Unwinds account history for multiple addresses in a single iterator pass.
///
/// `targets` MUST be sorted by address for correctness.
/// `None` means clear all history for that address.
pub fn unwind_account_history_batch(
&mut self,
targets: &[(Address, Option<BlockNumber>)],
) -> ProviderResult<()> {
if targets.is_empty() {
return Ok(());
}
debug_assert!(
targets.windows(2).all(|w| w[0].0 <= w[1].0),
"targets must be sorted by address for correctness"
);
const PREFIX_LEN: usize = 20;
let cf = self.provider.get_cf_handle::<tables::AccountsHistory>()?;
let mut iter = self.provider.0.raw_iterator_cf(cf);
for (address, keep_to) in targets {
let start_key = ShardedKey::new(*address, 0u64).encode();
let target_prefix = &start_key[..PREFIX_LEN];
let needs_seek = if iter.valid() {
if let Some(current_key) = iter.key() {
current_key.get(..PREFIX_LEN).is_none_or(|p| p < target_prefix)
} else {
true
}
} else {
true
};
if needs_seek {
iter.seek(start_key);
iter.status().map_err(|e| {
ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
}))
})?;
}
let mut shards = Vec::new();
while iter.valid() {
let Some(key_bytes) = iter.key() else { break };
let current_prefix = key_bytes.get(..PREFIX_LEN);
if current_prefix != Some(target_prefix) {
break;
}
let key = ShardedKey::<Address>::decode(key_bytes)
.map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
let Some(value_bytes) = iter.value() else { break };
let value = BlockNumberList::decompress(value_bytes)
.map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
shards.push((key, value));
iter.next();
}
iter.status().map_err(|e| {
ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
}))
})?;
if shards.is_empty() {
continue;
}
let addr = *address;
match keep_to {
Some(keep_to) => {
self.unwind_history_shards_inner(
shards,
*keep_to,
|key| key.highest_block_number,
|key| key.highest_block_number == u64::MAX,
|batch, key| batch.delete::<tables::AccountsHistory>(key),
|batch, key, value| batch.put::<tables::AccountsHistory>(key, value),
|| ShardedKey::new(addr, u64::MAX),
)?;
}
None => {
for (key, _) in shards {
self.delete::<tables::AccountsHistory>(key)?;
}
}
}
}
Ok(())
}
/// Unwinds history shards, keeping only blocks <= `keep_to`.
///
/// Generic implementation for both account and storage history unwinding.
/// Mirrors MDBX `unwind_history_shards` semantics:
/// - Deletes shards entirely above `keep_to`
/// - Truncates the boundary shard and re-keys to `u64::MAX` sentinel
/// - Preserves shards entirely below `keep_to`
#[allow(clippy::too_many_arguments)]
fn unwind_history_shards_inner<K>(
&mut self,
shards: Vec<(K, BlockNumberList)>,
keep_to: BlockNumber,
get_highest: impl Fn(&K) -> u64,
is_sentinel: impl Fn(&K) -> bool,
delete_shard: impl Fn(&mut Self, K) -> ProviderResult<()>,
put_shard: impl Fn(&mut Self, K, &BlockNumberList) -> ProviderResult<()>,
create_sentinel: impl Fn() -> K,
) -> ProviderResult<()>
where
K: Clone,
{
if shards.is_empty() {
return Ok(());
}
// Find the first shard that might contain blocks > keep_to.
// A shard is affected if it's the sentinel (u64::MAX) or its highest_block_number > keep_to
let boundary_idx = shards.iter().position(|(key, _)| {
key.highest_block_number == u64::MAX || key.highest_block_number > keep_to
});
let boundary_idx =
shards.iter().position(|(key, _)| is_sentinel(key) || get_highest(key) > keep_to);
// Repair path: no shards affected means all blocks <= keep_to, just ensure sentinel exists
let Some(boundary_idx) = boundary_idx else {
let (last_key, last_value) = shards.last().expect("shards is non-empty");
if last_key.highest_block_number != u64::MAX {
self.delete::<tables::AccountsHistory>(last_key.clone())?;
self.put::<tables::AccountsHistory>(
ShardedKey::new(address, u64::MAX),
last_value,
)?;
if !is_sentinel(last_key) {
delete_shard(self, last_key.clone())?;
put_shard(self, create_sentinel(), last_value)?;
}
return Ok(());
};
// Delete all shards strictly after the boundary (they are entirely > keep_to)
for (key, _) in shards.iter().skip(boundary_idx + 1) {
self.delete::<tables::AccountsHistory>(key.clone())?;
delete_shard(self, key.clone())?;
}
// Process the boundary shard: filter out blocks > keep_to
let (boundary_key, boundary_list) = &shards[boundary_idx];
delete_shard(self, boundary_key.clone())?;
// Delete the boundary shard (we'll either drop it or rewrite at u64::MAX)
self.delete::<tables::AccountsHistory>(boundary_key.clone())?;
// Build truncated list once; check emptiness directly (avoids double iteration)
let new_last =
BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to));
if new_last.is_empty() {
// Boundary shard is now empty. Previous shard becomes the last and must be keyed
// u64::MAX.
if boundary_idx == 0 {
// Nothing left for this address
return Ok(());
}
let (prev_key, prev_value) = &shards[boundary_idx - 1];
if prev_key.highest_block_number != u64::MAX {
self.delete::<tables::AccountsHistory>(prev_key.clone())?;
self.put::<tables::AccountsHistory>(
ShardedKey::new(address, u64::MAX),
prev_value,
)?;
if !is_sentinel(prev_key) {
delete_shard(self, prev_key.clone())?;
put_shard(self, create_sentinel(), prev_value)?;
}
return Ok(());
}
self.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &new_last)?;
put_shard(self, create_sentinel(), &new_last)?;
Ok(())
}
/// Unwinds storage history for multiple (address, `storage_key`) pairs in a single pass.
///
/// `targets` MUST be sorted by (address, `storage_key`) for correctness.
/// `None` means clear all history for that slot.
pub fn unwind_storage_history_batch(
&mut self,
targets: &[((Address, B256), Option<BlockNumber>)],
) -> ProviderResult<()> {
if targets.is_empty() {
return Ok(());
}
debug_assert!(
targets.windows(2).all(|w| w[0].0 <= w[1].0),
"targets must be sorted by (address, storage_key) for correctness"
);
const PREFIX_LEN: usize = 52;
let cf = self.provider.get_cf_handle::<tables::StoragesHistory>()?;
let mut iter = self.provider.0.raw_iterator_cf(cf);
for ((address, storage_key), keep_to) in targets {
let start_key = StorageShardedKey::new(*address, *storage_key, 0u64).encode();
let target_prefix = &start_key[..PREFIX_LEN];
let needs_seek = if iter.valid() {
if let Some(current_key) = iter.key() {
current_key.get(..PREFIX_LEN).is_none_or(|p| p < target_prefix)
} else {
true
}
} else {
true
};
if needs_seek {
iter.seek(start_key);
iter.status().map_err(|e| {
ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
}))
})?;
}
let mut shards = Vec::new();
while iter.valid() {
let Some(key_bytes) = iter.key() else { break };
let current_prefix = key_bytes.get(..PREFIX_LEN);
if current_prefix != Some(target_prefix) {
break;
}
let key = StorageShardedKey::decode(key_bytes)
.map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
let Some(value_bytes) = iter.value() else { break };
let value = BlockNumberList::decompress(value_bytes)
.map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
shards.push((key, value));
iter.next();
}
iter.status().map_err(|e| {
ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
}))
})?;
if shards.is_empty() {
continue;
}
let addr = *address;
let skey = *storage_key;
match keep_to {
Some(keep_to) => {
self.unwind_history_shards_inner(
shards,
*keep_to,
|key| key.sharded_key.highest_block_number,
|key| key.sharded_key.highest_block_number == u64::MAX,
|batch, key| batch.delete::<tables::StoragesHistory>(key),
|batch, key, value| batch.put::<tables::StoragesHistory>(key, value),
|| StorageShardedKey::last(addr, skey),
)?;
}
None => {
for (key, _) in shards {
self.delete::<tables::StoragesHistory>(key)?;
}
}
}
}
Ok(())
}
@@ -2011,70 +2230,15 @@ impl<'a> RocksDBBatch<'a> {
keep_to: BlockNumber,
) -> ProviderResult<()> {
let shards = self.provider.storage_history_shards(address, storage_key)?;
if shards.is_empty() {
return Ok(());
}
// Find the first shard that might contain blocks > keep_to.
// A shard is affected if it's the sentinel (u64::MAX) or its highest_block_number > keep_to
let boundary_idx = shards.iter().position(|(key, _)| {
key.sharded_key.highest_block_number == u64::MAX ||
key.sharded_key.highest_block_number > keep_to
});
// Repair path: no shards affected means all blocks <= keep_to, just ensure sentinel exists
let Some(boundary_idx) = boundary_idx else {
let (last_key, last_value) = shards.last().expect("shards is non-empty");
if last_key.sharded_key.highest_block_number != u64::MAX {
self.delete::<tables::StoragesHistory>(last_key.clone())?;
self.put::<tables::StoragesHistory>(
StorageShardedKey::last(address, storage_key),
last_value,
)?;
}
return Ok(());
};
// Delete all shards strictly after the boundary (they are entirely > keep_to)
for (key, _) in shards.iter().skip(boundary_idx + 1) {
self.delete::<tables::StoragesHistory>(key.clone())?;
}
// Process the boundary shard: filter out blocks > keep_to
let (boundary_key, boundary_list) = &shards[boundary_idx];
// Delete the boundary shard (we'll either drop it or rewrite at u64::MAX)
self.delete::<tables::StoragesHistory>(boundary_key.clone())?;
// Build truncated list once; check emptiness directly (avoids double iteration)
let new_last =
BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to));
if new_last.is_empty() {
// Boundary shard is now empty. Previous shard becomes the last and must be keyed
// u64::MAX.
if boundary_idx == 0 {
// Nothing left for this (address, storage_key) pair
return Ok(());
}
let (prev_key, prev_value) = &shards[boundary_idx - 1];
if prev_key.sharded_key.highest_block_number != u64::MAX {
self.delete::<tables::StoragesHistory>(prev_key.clone())?;
self.put::<tables::StoragesHistory>(
StorageShardedKey::last(address, storage_key),
prev_value,
)?;
}
return Ok(());
}
self.put::<tables::StoragesHistory>(
StorageShardedKey::last(address, storage_key),
&new_last,
)?;
Ok(())
self.unwind_history_shards_inner(
shards,
keep_to,
|key| key.sharded_key.highest_block_number,
|key| key.sharded_key.highest_block_number == u64::MAX,
|batch, key| batch.delete::<tables::StoragesHistory>(key),
|batch, key, value| batch.put::<tables::StoragesHistory>(key, value),
|| StorageShardedKey::last(address, storage_key),
)
}
/// Clears all account history shards for the given address.