mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
903c58749d | ||
|
|
d1dfce17ab | ||
|
|
abe603b94c | ||
|
|
9258bcdc7b | ||
|
|
7134ae4ec7 | ||
|
|
d9f81bc104 |
@@ -1098,13 +1098,14 @@ impl RocksDBProvider {
|
||||
.or_insert(block_number);
|
||||
}
|
||||
|
||||
let mut targets: Vec<_> = address_min_block
|
||||
.into_iter()
|
||||
.map(|(address, min_block)| (address, min_block.checked_sub(1)))
|
||||
.collect();
|
||||
targets.sort_unstable_by_key(|(addr, _)| *addr);
|
||||
|
||||
let mut batch = self.batch();
|
||||
for (address, min_block) in address_min_block {
|
||||
match min_block.checked_sub(1) {
|
||||
Some(keep_to) => batch.unwind_account_history_to(address, keep_to)?,
|
||||
None => batch.clear_account_history(address)?,
|
||||
}
|
||||
}
|
||||
batch.unwind_account_history_batch(&targets)?;
|
||||
|
||||
Ok(batch.into_inner())
|
||||
}
|
||||
@@ -1129,13 +1130,14 @@ impl RocksDBProvider {
|
||||
.or_insert(block_number);
|
||||
}
|
||||
|
||||
let mut targets: Vec<_> = key_min_block
|
||||
.into_iter()
|
||||
.map(|(key, min_block)| (key, min_block.checked_sub(1)))
|
||||
.collect();
|
||||
targets.sort_unstable_by_key(|(key, _)| *key);
|
||||
|
||||
let mut batch = self.batch();
|
||||
for ((address, storage_key), min_block) in key_min_block {
|
||||
match min_block.checked_sub(1) {
|
||||
Some(keep_to) => batch.unwind_storage_history_to(address, storage_key, keep_to)?,
|
||||
None => batch.clear_storage_history(address, storage_key)?,
|
||||
}
|
||||
}
|
||||
batch.unwind_storage_history_batch(&targets)?;
|
||||
|
||||
Ok(batch.into_inner())
|
||||
}
|
||||
@@ -1616,64 +1618,281 @@ impl<'a> RocksDBBatch<'a> {
|
||||
keep_to: BlockNumber,
|
||||
) -> ProviderResult<()> {
|
||||
let shards = self.provider.account_history_shards(address)?;
|
||||
self.unwind_history_shards_inner(
|
||||
shards,
|
||||
keep_to,
|
||||
|key| key.highest_block_number,
|
||||
|key| key.highest_block_number == u64::MAX,
|
||||
|batch, key| batch.delete::<tables::AccountsHistory>(key),
|
||||
|batch, key, value| batch.put::<tables::AccountsHistory>(key, value),
|
||||
|| ShardedKey::new(address, u64::MAX),
|
||||
)
|
||||
}
|
||||
|
||||
/// Unwinds account history for multiple addresses in a single iterator pass.
|
||||
///
|
||||
/// `targets` MUST be sorted by address for correctness.
|
||||
/// `None` means clear all history for that address.
|
||||
pub fn unwind_account_history_batch(
|
||||
&mut self,
|
||||
targets: &[(Address, Option<BlockNumber>)],
|
||||
) -> ProviderResult<()> {
|
||||
if targets.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
debug_assert!(
|
||||
targets.windows(2).all(|w| w[0].0 <= w[1].0),
|
||||
"targets must be sorted by address for correctness"
|
||||
);
|
||||
|
||||
const PREFIX_LEN: usize = 20;
|
||||
|
||||
let cf = self.provider.get_cf_handle::<tables::AccountsHistory>()?;
|
||||
let mut iter = self.provider.0.raw_iterator_cf(cf);
|
||||
|
||||
for (address, keep_to) in targets {
|
||||
let start_key = ShardedKey::new(*address, 0u64).encode();
|
||||
let target_prefix = &start_key[..PREFIX_LEN];
|
||||
|
||||
let needs_seek = if iter.valid() {
|
||||
if let Some(current_key) = iter.key() {
|
||||
current_key.get(..PREFIX_LEN).is_none_or(|p| p < target_prefix)
|
||||
} else {
|
||||
true
|
||||
}
|
||||
} else {
|
||||
true
|
||||
};
|
||||
|
||||
if needs_seek {
|
||||
iter.seek(start_key);
|
||||
iter.status().map_err(|e| {
|
||||
ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
|
||||
message: e.to_string().into(),
|
||||
code: -1,
|
||||
}))
|
||||
})?;
|
||||
}
|
||||
|
||||
let mut shards = Vec::new();
|
||||
while iter.valid() {
|
||||
let Some(key_bytes) = iter.key() else { break };
|
||||
|
||||
let current_prefix = key_bytes.get(..PREFIX_LEN);
|
||||
if current_prefix != Some(target_prefix) {
|
||||
break;
|
||||
}
|
||||
|
||||
let key = ShardedKey::<Address>::decode(key_bytes)
|
||||
.map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
|
||||
|
||||
let Some(value_bytes) = iter.value() else { break };
|
||||
let value = BlockNumberList::decompress(value_bytes)
|
||||
.map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
|
||||
|
||||
shards.push((key, value));
|
||||
iter.next();
|
||||
}
|
||||
|
||||
iter.status().map_err(|e| {
|
||||
ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
|
||||
message: e.to_string().into(),
|
||||
code: -1,
|
||||
}))
|
||||
})?;
|
||||
|
||||
if shards.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let addr = *address;
|
||||
match keep_to {
|
||||
Some(keep_to) => {
|
||||
self.unwind_history_shards_inner(
|
||||
shards,
|
||||
*keep_to,
|
||||
|key| key.highest_block_number,
|
||||
|key| key.highest_block_number == u64::MAX,
|
||||
|batch, key| batch.delete::<tables::AccountsHistory>(key),
|
||||
|batch, key, value| batch.put::<tables::AccountsHistory>(key, value),
|
||||
|| ShardedKey::new(addr, u64::MAX),
|
||||
)?;
|
||||
}
|
||||
None => {
|
||||
for (key, _) in shards {
|
||||
self.delete::<tables::AccountsHistory>(key)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Unwinds history shards, keeping only blocks <= `keep_to`.
|
||||
///
|
||||
/// Generic implementation for both account and storage history unwinding.
|
||||
/// Mirrors MDBX `unwind_history_shards` semantics:
|
||||
/// - Deletes shards entirely above `keep_to`
|
||||
/// - Truncates the boundary shard and re-keys to `u64::MAX` sentinel
|
||||
/// - Preserves shards entirely below `keep_to`
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn unwind_history_shards_inner<K>(
|
||||
&mut self,
|
||||
shards: Vec<(K, BlockNumberList)>,
|
||||
keep_to: BlockNumber,
|
||||
get_highest: impl Fn(&K) -> u64,
|
||||
is_sentinel: impl Fn(&K) -> bool,
|
||||
delete_shard: impl Fn(&mut Self, K) -> ProviderResult<()>,
|
||||
put_shard: impl Fn(&mut Self, K, &BlockNumberList) -> ProviderResult<()>,
|
||||
create_sentinel: impl Fn() -> K,
|
||||
) -> ProviderResult<()>
|
||||
where
|
||||
K: Clone,
|
||||
{
|
||||
if shards.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Find the first shard that might contain blocks > keep_to.
|
||||
// A shard is affected if it's the sentinel (u64::MAX) or its highest_block_number > keep_to
|
||||
let boundary_idx = shards.iter().position(|(key, _)| {
|
||||
key.highest_block_number == u64::MAX || key.highest_block_number > keep_to
|
||||
});
|
||||
let boundary_idx =
|
||||
shards.iter().position(|(key, _)| is_sentinel(key) || get_highest(key) > keep_to);
|
||||
|
||||
// Repair path: no shards affected means all blocks <= keep_to, just ensure sentinel exists
|
||||
let Some(boundary_idx) = boundary_idx else {
|
||||
let (last_key, last_value) = shards.last().expect("shards is non-empty");
|
||||
if last_key.highest_block_number != u64::MAX {
|
||||
self.delete::<tables::AccountsHistory>(last_key.clone())?;
|
||||
self.put::<tables::AccountsHistory>(
|
||||
ShardedKey::new(address, u64::MAX),
|
||||
last_value,
|
||||
)?;
|
||||
if !is_sentinel(last_key) {
|
||||
delete_shard(self, last_key.clone())?;
|
||||
put_shard(self, create_sentinel(), last_value)?;
|
||||
}
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
// Delete all shards strictly after the boundary (they are entirely > keep_to)
|
||||
for (key, _) in shards.iter().skip(boundary_idx + 1) {
|
||||
self.delete::<tables::AccountsHistory>(key.clone())?;
|
||||
delete_shard(self, key.clone())?;
|
||||
}
|
||||
|
||||
// Process the boundary shard: filter out blocks > keep_to
|
||||
let (boundary_key, boundary_list) = &shards[boundary_idx];
|
||||
delete_shard(self, boundary_key.clone())?;
|
||||
|
||||
// Delete the boundary shard (we'll either drop it or rewrite at u64::MAX)
|
||||
self.delete::<tables::AccountsHistory>(boundary_key.clone())?;
|
||||
|
||||
// Build truncated list once; check emptiness directly (avoids double iteration)
|
||||
let new_last =
|
||||
BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to));
|
||||
|
||||
if new_last.is_empty() {
|
||||
// Boundary shard is now empty. Previous shard becomes the last and must be keyed
|
||||
// u64::MAX.
|
||||
if boundary_idx == 0 {
|
||||
// Nothing left for this address
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let (prev_key, prev_value) = &shards[boundary_idx - 1];
|
||||
if prev_key.highest_block_number != u64::MAX {
|
||||
self.delete::<tables::AccountsHistory>(prev_key.clone())?;
|
||||
self.put::<tables::AccountsHistory>(
|
||||
ShardedKey::new(address, u64::MAX),
|
||||
prev_value,
|
||||
)?;
|
||||
if !is_sentinel(prev_key) {
|
||||
delete_shard(self, prev_key.clone())?;
|
||||
put_shard(self, create_sentinel(), prev_value)?;
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &new_last)?;
|
||||
put_shard(self, create_sentinel(), &new_last)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Unwinds storage history for multiple (address, `storage_key`) pairs in a single pass.
|
||||
///
|
||||
/// `targets` MUST be sorted by (address, `storage_key`) for correctness.
|
||||
/// `None` means clear all history for that slot.
|
||||
pub fn unwind_storage_history_batch(
|
||||
&mut self,
|
||||
targets: &[((Address, B256), Option<BlockNumber>)],
|
||||
) -> ProviderResult<()> {
|
||||
if targets.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
debug_assert!(
|
||||
targets.windows(2).all(|w| w[0].0 <= w[1].0),
|
||||
"targets must be sorted by (address, storage_key) for correctness"
|
||||
);
|
||||
|
||||
const PREFIX_LEN: usize = 52;
|
||||
|
||||
let cf = self.provider.get_cf_handle::<tables::StoragesHistory>()?;
|
||||
let mut iter = self.provider.0.raw_iterator_cf(cf);
|
||||
|
||||
for ((address, storage_key), keep_to) in targets {
|
||||
let start_key = StorageShardedKey::new(*address, *storage_key, 0u64).encode();
|
||||
let target_prefix = &start_key[..PREFIX_LEN];
|
||||
|
||||
let needs_seek = if iter.valid() {
|
||||
if let Some(current_key) = iter.key() {
|
||||
current_key.get(..PREFIX_LEN).is_none_or(|p| p < target_prefix)
|
||||
} else {
|
||||
true
|
||||
}
|
||||
} else {
|
||||
true
|
||||
};
|
||||
|
||||
if needs_seek {
|
||||
iter.seek(start_key);
|
||||
iter.status().map_err(|e| {
|
||||
ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
|
||||
message: e.to_string().into(),
|
||||
code: -1,
|
||||
}))
|
||||
})?;
|
||||
}
|
||||
|
||||
let mut shards = Vec::new();
|
||||
while iter.valid() {
|
||||
let Some(key_bytes) = iter.key() else { break };
|
||||
|
||||
let current_prefix = key_bytes.get(..PREFIX_LEN);
|
||||
if current_prefix != Some(target_prefix) {
|
||||
break;
|
||||
}
|
||||
|
||||
let key = StorageShardedKey::decode(key_bytes)
|
||||
.map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
|
||||
|
||||
let Some(value_bytes) = iter.value() else { break };
|
||||
let value = BlockNumberList::decompress(value_bytes)
|
||||
.map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
|
||||
|
||||
shards.push((key, value));
|
||||
iter.next();
|
||||
}
|
||||
|
||||
iter.status().map_err(|e| {
|
||||
ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
|
||||
message: e.to_string().into(),
|
||||
code: -1,
|
||||
}))
|
||||
})?;
|
||||
|
||||
if shards.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let addr = *address;
|
||||
let skey = *storage_key;
|
||||
match keep_to {
|
||||
Some(keep_to) => {
|
||||
self.unwind_history_shards_inner(
|
||||
shards,
|
||||
*keep_to,
|
||||
|key| key.sharded_key.highest_block_number,
|
||||
|key| key.sharded_key.highest_block_number == u64::MAX,
|
||||
|batch, key| batch.delete::<tables::StoragesHistory>(key),
|
||||
|batch, key, value| batch.put::<tables::StoragesHistory>(key, value),
|
||||
|| StorageShardedKey::last(addr, skey),
|
||||
)?;
|
||||
}
|
||||
None => {
|
||||
for (key, _) in shards {
|
||||
self.delete::<tables::StoragesHistory>(key)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -2011,70 +2230,15 @@ impl<'a> RocksDBBatch<'a> {
|
||||
keep_to: BlockNumber,
|
||||
) -> ProviderResult<()> {
|
||||
let shards = self.provider.storage_history_shards(address, storage_key)?;
|
||||
if shards.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Find the first shard that might contain blocks > keep_to.
|
||||
// A shard is affected if it's the sentinel (u64::MAX) or its highest_block_number > keep_to
|
||||
let boundary_idx = shards.iter().position(|(key, _)| {
|
||||
key.sharded_key.highest_block_number == u64::MAX ||
|
||||
key.sharded_key.highest_block_number > keep_to
|
||||
});
|
||||
|
||||
// Repair path: no shards affected means all blocks <= keep_to, just ensure sentinel exists
|
||||
let Some(boundary_idx) = boundary_idx else {
|
||||
let (last_key, last_value) = shards.last().expect("shards is non-empty");
|
||||
if last_key.sharded_key.highest_block_number != u64::MAX {
|
||||
self.delete::<tables::StoragesHistory>(last_key.clone())?;
|
||||
self.put::<tables::StoragesHistory>(
|
||||
StorageShardedKey::last(address, storage_key),
|
||||
last_value,
|
||||
)?;
|
||||
}
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
// Delete all shards strictly after the boundary (they are entirely > keep_to)
|
||||
for (key, _) in shards.iter().skip(boundary_idx + 1) {
|
||||
self.delete::<tables::StoragesHistory>(key.clone())?;
|
||||
}
|
||||
|
||||
// Process the boundary shard: filter out blocks > keep_to
|
||||
let (boundary_key, boundary_list) = &shards[boundary_idx];
|
||||
|
||||
// Delete the boundary shard (we'll either drop it or rewrite at u64::MAX)
|
||||
self.delete::<tables::StoragesHistory>(boundary_key.clone())?;
|
||||
|
||||
// Build truncated list once; check emptiness directly (avoids double iteration)
|
||||
let new_last =
|
||||
BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to));
|
||||
|
||||
if new_last.is_empty() {
|
||||
// Boundary shard is now empty. Previous shard becomes the last and must be keyed
|
||||
// u64::MAX.
|
||||
if boundary_idx == 0 {
|
||||
// Nothing left for this (address, storage_key) pair
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let (prev_key, prev_value) = &shards[boundary_idx - 1];
|
||||
if prev_key.sharded_key.highest_block_number != u64::MAX {
|
||||
self.delete::<tables::StoragesHistory>(prev_key.clone())?;
|
||||
self.put::<tables::StoragesHistory>(
|
||||
StorageShardedKey::last(address, storage_key),
|
||||
prev_value,
|
||||
)?;
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.put::<tables::StoragesHistory>(
|
||||
StorageShardedKey::last(address, storage_key),
|
||||
&new_last,
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
self.unwind_history_shards_inner(
|
||||
shards,
|
||||
keep_to,
|
||||
|key| key.sharded_key.highest_block_number,
|
||||
|key| key.sharded_key.highest_block_number == u64::MAX,
|
||||
|batch, key| batch.delete::<tables::StoragesHistory>(key),
|
||||
|batch, key, value| batch.put::<tables::StoragesHistory>(key, value),
|
||||
|| StorageShardedKey::last(address, storage_key),
|
||||
)
|
||||
}
|
||||
|
||||
/// Clears all account history shards for the given address.
|
||||
|
||||
Reference in New Issue
Block a user