mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
7 Commits
dan/static
...
pr-21618
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3868c5548e | ||
|
|
b8b3a5624b | ||
|
|
e4696f8588 | ||
|
|
bcbf1edcbd | ||
|
|
b311dda4ab | ||
|
|
aaf521a8dc | ||
|
|
e862dc143d |
@@ -26,6 +26,10 @@ use tracing::{instrument, trace};
|
||||
/// [`tables::AccountsHistory`]. We want to prune them to the same block number.
|
||||
const ACCOUNT_HISTORY_TABLES_TO_PRUNE: usize = 2;
|
||||
|
||||
/// Maximum entries to process per internal batch for account history.
|
||||
/// This bounds memory usage of the `highest_deleted_accounts` `HashMap`.
|
||||
const MAX_ACCOUNT_HISTORY_ENTRIES_PER_RUN: usize = 200_000;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct AccountHistory {
|
||||
mode: PruneMode,
|
||||
@@ -78,6 +82,10 @@ where
|
||||
|
||||
impl AccountHistory {
|
||||
/// Prunes account history when changesets are stored in static files.
|
||||
///
|
||||
/// When no limit is provided, uses internal batching to process ALL changesets in chunks,
|
||||
/// preventing OOM by limiting the size of the `highest_deleted_accounts` `HashMap` per batch.
|
||||
/// When a limit is provided, respects that limit and may return `HasMoreData`.
|
||||
fn prune_static_files<Provider>(
|
||||
&self,
|
||||
provider: &Provider,
|
||||
@@ -101,52 +109,100 @@ impl AccountHistory {
|
||||
))
|
||||
}
|
||||
|
||||
// The size of this map it's limited by `prune_delete_limit * blocks_since_last_run /
|
||||
// ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with the current defaults it's usually `3500 * 5 /
|
||||
// 2`, so 8750 entries. Each entry is `160 bit + 64 bit`, so the total size should be up to
|
||||
// ~0.25MB + some hashmap overhead. `blocks_since_last_run` is additionally limited by the
|
||||
// `max_reorg_depth`, so no OOM is expected here.
|
||||
let mut highest_deleted_accounts = FxHashMap::default();
|
||||
let mut last_changeset_pruned_block = None;
|
||||
let mut pruned_changesets = 0;
|
||||
let mut done = true;
|
||||
let mut total_pruned = 0;
|
||||
let mut actual_last_pruned_block: Option<BlockNumber> = None;
|
||||
let mut overall_done = false;
|
||||
|
||||
let walker = StaticFileAccountChangesetWalker::new(provider, range);
|
||||
for result in walker {
|
||||
if limiter.is_limit_reached() {
|
||||
done = false;
|
||||
let mut walker = StaticFileAccountChangesetWalker::new(provider, range).peekable();
|
||||
|
||||
loop {
|
||||
let mut highest_deleted_accounts = FxHashMap::with_capacity_and_hasher(
|
||||
MAX_ACCOUNT_HISTORY_ENTRIES_PER_RUN,
|
||||
Default::default(),
|
||||
);
|
||||
let mut batch_last_block: Option<BlockNumber> = None;
|
||||
let mut batch_count = 0;
|
||||
|
||||
while batch_count < MAX_ACCOUNT_HISTORY_ENTRIES_PER_RUN {
|
||||
// Check limit before consuming next item
|
||||
if limiter.is_limit_reached() {
|
||||
break;
|
||||
}
|
||||
|
||||
match walker.next() {
|
||||
Some(Ok((block_number, changeset))) => {
|
||||
highest_deleted_accounts.insert(changeset.address, block_number);
|
||||
batch_last_block = Some(block_number);
|
||||
batch_count += 1;
|
||||
limiter.increment_deleted_entries_count();
|
||||
}
|
||||
Some(Err(e)) => return Err(e.into()),
|
||||
None => break, // Iterator exhausted
|
||||
}
|
||||
}
|
||||
|
||||
// No more data in this batch
|
||||
if batch_count == 0 {
|
||||
break;
|
||||
}
|
||||
|
||||
actual_last_pruned_block = batch_last_block;
|
||||
|
||||
// Check if there's more data after this batch
|
||||
let has_more_data = walker.peek().is_some();
|
||||
|
||||
// Track whether we're done for the final output
|
||||
overall_done = !has_more_data;
|
||||
|
||||
trace!(target: "pruner", pruned = %batch_count, done = %overall_done, "Pruned account history batch (changesets from static files)");
|
||||
|
||||
let result = HistoryPruneResult {
|
||||
highest_deleted: highest_deleted_accounts,
|
||||
last_pruned_block: batch_last_block,
|
||||
pruned_count: batch_count,
|
||||
done: overall_done,
|
||||
};
|
||||
|
||||
let output = finalize_history_prune::<_, tables::AccountsHistory, _, _>(
|
||||
provider,
|
||||
result,
|
||||
range_end,
|
||||
&limiter,
|
||||
ShardedKey::new,
|
||||
|a, b| a.key == b.key,
|
||||
)?;
|
||||
|
||||
total_pruned += output.pruned;
|
||||
|
||||
// If external limit reached and there's more data, stop
|
||||
// Otherwise continue to next batch (or exit if no more data)
|
||||
if limiter.is_limit_reached() && has_more_data {
|
||||
break;
|
||||
}
|
||||
let (block_number, changeset) = result?;
|
||||
highest_deleted_accounts.insert(changeset.address, block_number);
|
||||
last_changeset_pruned_block = Some(block_number);
|
||||
pruned_changesets += 1;
|
||||
limiter.increment_deleted_entries_count();
|
||||
}
|
||||
|
||||
// Delete static file jars below the pruned block
|
||||
if let Some(last_block) = last_changeset_pruned_block {
|
||||
// Delete static files once at end
|
||||
if let Some(last_block) = actual_last_pruned_block {
|
||||
provider
|
||||
.static_file_provider()
|
||||
.delete_segment_below_block(StaticFileSegment::AccountChangeSets, last_block + 1)?;
|
||||
}
|
||||
trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets from static files)");
|
||||
|
||||
let result = HistoryPruneResult {
|
||||
highest_deleted: highest_deleted_accounts,
|
||||
last_pruned_block: last_changeset_pruned_block,
|
||||
pruned_count: pruned_changesets,
|
||||
done,
|
||||
};
|
||||
finalize_history_prune::<_, tables::AccountsHistory, _, _>(
|
||||
provider,
|
||||
result,
|
||||
range_end,
|
||||
&limiter,
|
||||
ShardedKey::new,
|
||||
|a, b| a.key == b.key,
|
||||
)
|
||||
.map_err(Into::into)
|
||||
trace!(target: "pruner", pruned = %total_pruned, %overall_done, "Pruned account history (changesets from static files)");
|
||||
|
||||
let progress = limiter.progress(overall_done);
|
||||
let last_checkpoint_block = actual_last_pruned_block
|
||||
.map(|block| if overall_done { block } else { block.saturating_sub(1) })
|
||||
.unwrap_or(range_end);
|
||||
|
||||
Ok(SegmentOutput {
|
||||
progress,
|
||||
pruned: total_pruned,
|
||||
checkpoint: Some(SegmentOutputCheckpoint {
|
||||
block_number: Some(last_checkpoint_block),
|
||||
tx_number: None,
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
||||
fn prune_database<Provider>(
|
||||
|
||||
@@ -27,6 +27,10 @@ use tracing::{instrument, trace};
|
||||
/// [`tables::StoragesHistory`]. We want to prune them to the same block number.
|
||||
const STORAGE_HISTORY_TABLES_TO_PRUNE: usize = 2;
|
||||
|
||||
/// Maximum entries to process per internal batch for storage history.
|
||||
/// This bounds memory usage of the `highest_deleted_storages` `HashMap`.
|
||||
const MAX_STORAGE_HISTORY_ENTRIES_PER_RUN: usize = 200_000;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct StorageHistory {
|
||||
mode: PruneMode,
|
||||
@@ -78,6 +82,11 @@ where
|
||||
|
||||
impl StorageHistory {
|
||||
/// Prunes storage history when changesets are stored in static files.
|
||||
///
|
||||
/// Uses internal batching to bound memory usage of the `highest_deleted_storages` `HashMap`.
|
||||
/// Each batch processes up to [`MAX_STORAGE_HISTORY_ENTRIES_PER_RUN`] entries, then flushes
|
||||
/// to the history index before continuing. This prevents OOM when pruning large ranges with
|
||||
/// many unique (address, `storage_key`) pairs.
|
||||
fn prune_static_files<Provider>(
|
||||
&self,
|
||||
provider: &Provider,
|
||||
@@ -101,56 +110,105 @@ impl StorageHistory {
|
||||
))
|
||||
}
|
||||
|
||||
// The size of this map is limited by `prune_delete_limit * blocks_since_last_run /
|
||||
// STORAGE_HISTORY_TABLES_TO_PRUNE`, and with current defaults it's usually `3500 * 5
|
||||
// / 2`, so 8750 entries. Each entry is `160 bit + 256 bit + 64 bit`, so the total
|
||||
// size should be up to ~0.5MB + some hashmap overhead. `blocks_since_last_run` is
|
||||
// additionally limited by the `max_reorg_depth`, so no OOM is expected here.
|
||||
let mut highest_deleted_storages = FxHashMap::default();
|
||||
let mut last_changeset_pruned_block = None;
|
||||
let mut pruned_changesets = 0;
|
||||
let mut done = true;
|
||||
let mut total_pruned = 0;
|
||||
let mut actual_last_pruned_block: Option<BlockNumber> = None;
|
||||
let mut overall_done = false;
|
||||
|
||||
let walker = provider.static_file_provider().walk_storage_changeset_range(range);
|
||||
for result in walker {
|
||||
if limiter.is_limit_reached() {
|
||||
done = false;
|
||||
let mut walker =
|
||||
provider.static_file_provider().walk_storage_changeset_range(range).peekable();
|
||||
|
||||
loop {
|
||||
let mut highest_deleted_storages = FxHashMap::with_capacity_and_hasher(
|
||||
MAX_STORAGE_HISTORY_ENTRIES_PER_RUN,
|
||||
Default::default(),
|
||||
);
|
||||
let mut batch_last_block: Option<BlockNumber> = None;
|
||||
let mut batch_count = 0;
|
||||
|
||||
while batch_count < MAX_STORAGE_HISTORY_ENTRIES_PER_RUN {
|
||||
// Check limit before consuming next item
|
||||
if limiter.is_limit_reached() {
|
||||
break;
|
||||
}
|
||||
|
||||
match walker.next() {
|
||||
Some(Ok((block_address, entry))) => {
|
||||
let block_number = block_address.block_number();
|
||||
let address = block_address.address();
|
||||
highest_deleted_storages.insert((address, entry.key), block_number);
|
||||
batch_last_block = Some(block_number);
|
||||
batch_count += 1;
|
||||
limiter.increment_deleted_entries_count();
|
||||
}
|
||||
Some(Err(e)) => return Err(e.into()),
|
||||
None => break, // Iterator exhausted
|
||||
}
|
||||
}
|
||||
|
||||
// No more data in this batch
|
||||
if batch_count == 0 {
|
||||
break;
|
||||
}
|
||||
|
||||
actual_last_pruned_block = batch_last_block;
|
||||
|
||||
// Check if there's more data after this batch
|
||||
let has_more_data = walker.peek().is_some();
|
||||
|
||||
// Track whether we're done for the final output
|
||||
overall_done = !has_more_data;
|
||||
|
||||
trace!(target: "pruner", pruned = %batch_count, done = %overall_done, "Pruned storage history batch (changesets from static files)");
|
||||
|
||||
let result = HistoryPruneResult {
|
||||
highest_deleted: highest_deleted_storages,
|
||||
last_pruned_block: batch_last_block,
|
||||
pruned_count: batch_count,
|
||||
done: overall_done,
|
||||
};
|
||||
|
||||
let output = finalize_history_prune::<_, tables::StoragesHistory, (Address, B256), _>(
|
||||
provider,
|
||||
result,
|
||||
range_end,
|
||||
&limiter,
|
||||
|(address, storage_key), block_number| {
|
||||
StorageShardedKey::new(address, storage_key, block_number)
|
||||
},
|
||||
|a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key,
|
||||
)?;
|
||||
|
||||
total_pruned += output.pruned;
|
||||
|
||||
// If external limit reached and there's more data, stop
|
||||
// Otherwise continue to next batch (or exit if no more data)
|
||||
if limiter.is_limit_reached() && has_more_data {
|
||||
break;
|
||||
}
|
||||
let (block_address, entry) = result?;
|
||||
let block_number = block_address.block_number();
|
||||
let address = block_address.address();
|
||||
highest_deleted_storages.insert((address, entry.key), block_number);
|
||||
last_changeset_pruned_block = Some(block_number);
|
||||
pruned_changesets += 1;
|
||||
limiter.increment_deleted_entries_count();
|
||||
}
|
||||
|
||||
// Delete static file jars below the pruned block
|
||||
if let Some(last_block) = last_changeset_pruned_block {
|
||||
// Delete static file jars below the pruned block (once at end)
|
||||
if let Some(last_block) = actual_last_pruned_block {
|
||||
provider
|
||||
.static_file_provider()
|
||||
.delete_segment_below_block(StaticFileSegment::StorageChangeSets, last_block + 1)?;
|
||||
}
|
||||
trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned storage history (changesets from static files)");
|
||||
|
||||
let result = HistoryPruneResult {
|
||||
highest_deleted: highest_deleted_storages,
|
||||
last_pruned_block: last_changeset_pruned_block,
|
||||
pruned_count: pruned_changesets,
|
||||
done,
|
||||
};
|
||||
finalize_history_prune::<_, tables::StoragesHistory, (Address, B256), _>(
|
||||
provider,
|
||||
result,
|
||||
range_end,
|
||||
&limiter,
|
||||
|(address, storage_key), block_number| {
|
||||
StorageShardedKey::new(address, storage_key, block_number)
|
||||
},
|
||||
|a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key,
|
||||
)
|
||||
.map_err(Into::into)
|
||||
trace!(target: "pruner", pruned = %total_pruned, %overall_done, "Pruned storage history (changesets from static files)");
|
||||
|
||||
let progress = limiter.progress(overall_done);
|
||||
let last_checkpoint_block = actual_last_pruned_block
|
||||
.map(|block| if overall_done { block } else { block.saturating_sub(1) })
|
||||
.unwrap_or(range_end);
|
||||
|
||||
Ok(SegmentOutput {
|
||||
progress,
|
||||
pruned: total_pruned,
|
||||
checkpoint: Some(SegmentOutputCheckpoint {
|
||||
block_number: Some(last_checkpoint_block),
|
||||
tx_number: None,
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
||||
fn prune_database<Provider>(
|
||||
|
||||
Reference in New Issue
Block a user