Compare commits

...

7 Commits

Author SHA1 Message Date
Ubuntu
3868c5548e chore: remove redundant BATCH_SIZE const alias
Amp-Thread-ID: https://ampcode.com/threads/T-019c0ebb-2a8c-701e-987b-e6ef3661fc6c
2026-01-30 12:18:55 +00:00
Ubuntu
b8b3a5624b chore: clean up comments in storage_history.rs
Amp-Thread-ID: https://ampcode.com/threads/T-019c0ebb-2a8c-701e-987b-e6ef3661fc6c
2026-01-30 11:52:09 +00:00
Ubuntu
e4696f8588 refactor: remove unnecessary Vec collection in history prune
Amp-Thread-ID: https://ampcode.com/threads/T-019c0e95-9255-72cd-b5c8-2668fcc05a80
2026-01-30 11:18:50 +00:00
Ubuntu
bcbf1edcbd chore: remove OOM test instrumentation
Remove temporary info! logging added for verifying the OOM fix:
- Remove HashMap size tracking and batch progress logging
- Remove unused batch_number variables
- Restore original tracing imports (remove unused 'info')

The fix itself (batched processing) remains intact.

Amp-Thread-ID: https://ampcode.com/threads/T-019c0e8b-2503-7648-8b13-f228fa847942
2026-01-30 10:57:05 +00:00
Ubuntu
b311dda4ab chore(prune): clean up batching code
- Use consistent 200k batch size for both account and storage history
- Remove unused imports (warn)
- Initialize overall_done to false (more accurate)
- Pre-allocate HashMap with with_capacity_and_hasher
- Fix doc comment backticks for clippy::doc-markdown
- Remove redundant comments

Amp-Thread-ID: https://ampcode.com/threads/T-019c0e57-3d9e-71aa-b84a-c26cb2607de4
2026-01-30 10:42:28 +00:00
Ubuntu
aaf521a8dc remove internal batching test per review feedback
Amp-Thread-ID: https://ampcode.com/threads/T-019c0dd0-d1c3-76f2-9a17-89f1467ce153
2026-01-30 07:33:02 +00:00
Ubuntu
e862dc143d fix(prune): use internal batching to prevent OOM in history pruning
When running `reth prune` with edge flag (static files), the pruner could
OOM because the FxHashMap tracking highest deleted entries grew unboundedly.

Root cause:
- CLI sets delete_limit(usize::MAX) for unbounded pruning
- With 21M+ blocks, HashMaps accumulate hundreds of millions of unique keys
- Storage history: ~45GB (500M address+slot pairs × 90 bytes)
- Account history: ~12GB (300M addresses × 42 bytes)

Fix: Internal batching in prune_static_files() for both storage and account
history. Each batch processes up to 100-200k entries, flushes to history
indices via finalize_history_prune(), then continues with next batch.

This bounds HashMap memory to ~20-40MB per batch while still completing
all work in a single run() call. The CLI behavior is unchanged - pruning
runs to completion without returning HasMoreData for intermediate batches.

Fixes RETH-291

Amp-Thread-ID: https://ampcode.com/threads/T-019c0db5-ce32-7328-95b3-8cf90d324232
2026-01-30 07:26:56 +00:00
2 changed files with 190 additions and 76 deletions

View File

@@ -26,6 +26,10 @@ use tracing::{instrument, trace};
/// [`tables::AccountsHistory`]. We want to prune them to the same block number.
const ACCOUNT_HISTORY_TABLES_TO_PRUNE: usize = 2;
/// Maximum entries to process per internal batch for account history.
/// This bounds memory usage of the `highest_deleted_accounts` `HashMap`.
const MAX_ACCOUNT_HISTORY_ENTRIES_PER_RUN: usize = 200_000;
#[derive(Debug)]
pub struct AccountHistory {
mode: PruneMode,
@@ -78,6 +82,10 @@ where
impl AccountHistory {
/// Prunes account history when changesets are stored in static files.
///
/// When no limit is provided, uses internal batching to process ALL changesets in chunks,
/// preventing OOM by limiting the size of the `highest_deleted_accounts` `HashMap` per batch.
/// When a limit is provided, respects that limit and may return `HasMoreData`.
fn prune_static_files<Provider>(
&self,
provider: &Provider,
@@ -101,52 +109,100 @@ impl AccountHistory {
))
}
// The size of this map it's limited by `prune_delete_limit * blocks_since_last_run /
// ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with the current defaults it's usually `3500 * 5 /
// 2`, so 8750 entries. Each entry is `160 bit + 64 bit`, so the total size should be up to
// ~0.25MB + some hashmap overhead. `blocks_since_last_run` is additionally limited by the
// `max_reorg_depth`, so no OOM is expected here.
let mut highest_deleted_accounts = FxHashMap::default();
let mut last_changeset_pruned_block = None;
let mut pruned_changesets = 0;
let mut done = true;
let mut total_pruned = 0;
let mut actual_last_pruned_block: Option<BlockNumber> = None;
let mut overall_done = false;
let walker = StaticFileAccountChangesetWalker::new(provider, range);
for result in walker {
if limiter.is_limit_reached() {
done = false;
let mut walker = StaticFileAccountChangesetWalker::new(provider, range).peekable();
loop {
let mut highest_deleted_accounts = FxHashMap::with_capacity_and_hasher(
MAX_ACCOUNT_HISTORY_ENTRIES_PER_RUN,
Default::default(),
);
let mut batch_last_block: Option<BlockNumber> = None;
let mut batch_count = 0;
while batch_count < MAX_ACCOUNT_HISTORY_ENTRIES_PER_RUN {
// Check limit before consuming next item
if limiter.is_limit_reached() {
break;
}
match walker.next() {
Some(Ok((block_number, changeset))) => {
highest_deleted_accounts.insert(changeset.address, block_number);
batch_last_block = Some(block_number);
batch_count += 1;
limiter.increment_deleted_entries_count();
}
Some(Err(e)) => return Err(e.into()),
None => break, // Iterator exhausted
}
}
// No more data in this batch
if batch_count == 0 {
break;
}
actual_last_pruned_block = batch_last_block;
// Check if there's more data after this batch
let has_more_data = walker.peek().is_some();
// Track whether we're done for the final output
overall_done = !has_more_data;
trace!(target: "pruner", pruned = %batch_count, done = %overall_done, "Pruned account history batch (changesets from static files)");
let result = HistoryPruneResult {
highest_deleted: highest_deleted_accounts,
last_pruned_block: batch_last_block,
pruned_count: batch_count,
done: overall_done,
};
let output = finalize_history_prune::<_, tables::AccountsHistory, _, _>(
provider,
result,
range_end,
&limiter,
ShardedKey::new,
|a, b| a.key == b.key,
)?;
total_pruned += output.pruned;
// If external limit reached and there's more data, stop
// Otherwise continue to next batch (or exit if no more data)
if limiter.is_limit_reached() && has_more_data {
break;
}
let (block_number, changeset) = result?;
highest_deleted_accounts.insert(changeset.address, block_number);
last_changeset_pruned_block = Some(block_number);
pruned_changesets += 1;
limiter.increment_deleted_entries_count();
}
// Delete static file jars below the pruned block
if let Some(last_block) = last_changeset_pruned_block {
// Delete static files once at end
if let Some(last_block) = actual_last_pruned_block {
provider
.static_file_provider()
.delete_segment_below_block(StaticFileSegment::AccountChangeSets, last_block + 1)?;
}
trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets from static files)");
let result = HistoryPruneResult {
highest_deleted: highest_deleted_accounts,
last_pruned_block: last_changeset_pruned_block,
pruned_count: pruned_changesets,
done,
};
finalize_history_prune::<_, tables::AccountsHistory, _, _>(
provider,
result,
range_end,
&limiter,
ShardedKey::new,
|a, b| a.key == b.key,
)
.map_err(Into::into)
trace!(target: "pruner", pruned = %total_pruned, %overall_done, "Pruned account history (changesets from static files)");
let progress = limiter.progress(overall_done);
let last_checkpoint_block = actual_last_pruned_block
.map(|block| if overall_done { block } else { block.saturating_sub(1) })
.unwrap_or(range_end);
Ok(SegmentOutput {
progress,
pruned: total_pruned,
checkpoint: Some(SegmentOutputCheckpoint {
block_number: Some(last_checkpoint_block),
tx_number: None,
}),
})
}
fn prune_database<Provider>(

View File

@@ -27,6 +27,10 @@ use tracing::{instrument, trace};
/// [`tables::StoragesHistory`]. We want to prune them to the same block number.
const STORAGE_HISTORY_TABLES_TO_PRUNE: usize = 2;
/// Maximum entries to process per internal batch for storage history.
/// This bounds memory usage of the `highest_deleted_storages` `HashMap`.
const MAX_STORAGE_HISTORY_ENTRIES_PER_RUN: usize = 200_000;
#[derive(Debug)]
pub struct StorageHistory {
mode: PruneMode,
@@ -78,6 +82,11 @@ where
impl StorageHistory {
/// Prunes storage history when changesets are stored in static files.
///
/// Uses internal batching to bound memory usage of the `highest_deleted_storages` `HashMap`.
/// Each batch processes up to [`MAX_STORAGE_HISTORY_ENTRIES_PER_RUN`] entries, then flushes
/// to the history index before continuing. This prevents OOM when pruning large ranges with
/// many unique (address, `storage_key`) pairs.
fn prune_static_files<Provider>(
&self,
provider: &Provider,
@@ -101,56 +110,105 @@ impl StorageHistory {
))
}
// The size of this map is limited by `prune_delete_limit * blocks_since_last_run /
// STORAGE_HISTORY_TABLES_TO_PRUNE`, and with current defaults it's usually `3500 * 5
// / 2`, so 8750 entries. Each entry is `160 bit + 256 bit + 64 bit`, so the total
// size should be up to ~0.5MB + some hashmap overhead. `blocks_since_last_run` is
// additionally limited by the `max_reorg_depth`, so no OOM is expected here.
let mut highest_deleted_storages = FxHashMap::default();
let mut last_changeset_pruned_block = None;
let mut pruned_changesets = 0;
let mut done = true;
let mut total_pruned = 0;
let mut actual_last_pruned_block: Option<BlockNumber> = None;
let mut overall_done = false;
let walker = provider.static_file_provider().walk_storage_changeset_range(range);
for result in walker {
if limiter.is_limit_reached() {
done = false;
let mut walker =
provider.static_file_provider().walk_storage_changeset_range(range).peekable();
loop {
let mut highest_deleted_storages = FxHashMap::with_capacity_and_hasher(
MAX_STORAGE_HISTORY_ENTRIES_PER_RUN,
Default::default(),
);
let mut batch_last_block: Option<BlockNumber> = None;
let mut batch_count = 0;
while batch_count < MAX_STORAGE_HISTORY_ENTRIES_PER_RUN {
// Check limit before consuming next item
if limiter.is_limit_reached() {
break;
}
match walker.next() {
Some(Ok((block_address, entry))) => {
let block_number = block_address.block_number();
let address = block_address.address();
highest_deleted_storages.insert((address, entry.key), block_number);
batch_last_block = Some(block_number);
batch_count += 1;
limiter.increment_deleted_entries_count();
}
Some(Err(e)) => return Err(e.into()),
None => break, // Iterator exhausted
}
}
// No more data in this batch
if batch_count == 0 {
break;
}
actual_last_pruned_block = batch_last_block;
// Check if there's more data after this batch
let has_more_data = walker.peek().is_some();
// Track whether we're done for the final output
overall_done = !has_more_data;
trace!(target: "pruner", pruned = %batch_count, done = %overall_done, "Pruned storage history batch (changesets from static files)");
let result = HistoryPruneResult {
highest_deleted: highest_deleted_storages,
last_pruned_block: batch_last_block,
pruned_count: batch_count,
done: overall_done,
};
let output = finalize_history_prune::<_, tables::StoragesHistory, (Address, B256), _>(
provider,
result,
range_end,
&limiter,
|(address, storage_key), block_number| {
StorageShardedKey::new(address, storage_key, block_number)
},
|a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key,
)?;
total_pruned += output.pruned;
// If external limit reached and there's more data, stop
// Otherwise continue to next batch (or exit if no more data)
if limiter.is_limit_reached() && has_more_data {
break;
}
let (block_address, entry) = result?;
let block_number = block_address.block_number();
let address = block_address.address();
highest_deleted_storages.insert((address, entry.key), block_number);
last_changeset_pruned_block = Some(block_number);
pruned_changesets += 1;
limiter.increment_deleted_entries_count();
}
// Delete static file jars below the pruned block
if let Some(last_block) = last_changeset_pruned_block {
// Delete static file jars below the pruned block (once at end)
if let Some(last_block) = actual_last_pruned_block {
provider
.static_file_provider()
.delete_segment_below_block(StaticFileSegment::StorageChangeSets, last_block + 1)?;
}
trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned storage history (changesets from static files)");
let result = HistoryPruneResult {
highest_deleted: highest_deleted_storages,
last_pruned_block: last_changeset_pruned_block,
pruned_count: pruned_changesets,
done,
};
finalize_history_prune::<_, tables::StoragesHistory, (Address, B256), _>(
provider,
result,
range_end,
&limiter,
|(address, storage_key), block_number| {
StorageShardedKey::new(address, storage_key, block_number)
},
|a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key,
)
.map_err(Into::into)
trace!(target: "pruner", pruned = %total_pruned, %overall_done, "Pruned storage history (changesets from static files)");
let progress = limiter.progress(overall_done);
let last_checkpoint_block = actual_last_pruned_block
.map(|block| if overall_done { block } else { block.saturating_sub(1) })
.unwrap_or(range_end);
Ok(SegmentOutput {
progress,
pruned: total_pruned,
checkpoint: Some(SegmentOutputCheckpoint {
block_number: Some(last_checkpoint_block),
tx_number: None,
}),
})
}
fn prune_database<Provider>(