fix(stages): bound storage hashing stages memory (#22721)

Co-authored-by: Derek Cofausper <256792747+decofe@users.noreply.github.com>
Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
Dan Cline
2026-03-04 16:55:12 -05:00
committed by GitHub
parent 26f4aab2a9
commit d8de8afa95
5 changed files with 157 additions and 57 deletions

View File

@@ -127,12 +127,14 @@ fn unwind_and_copy<N: ProviderNodeTypes>(
AccountHashingStage {
clean_threshold: u64::MAX,
commit_threshold: u64::MAX,
commit_entries: u64::MAX,
etl_config: EtlConfig::default(),
}
.execute(&provider, execute_input)?;
StorageHashingStage {
clean_threshold: u64::MAX,
commit_threshold: u64::MAX,
commit_entries: u64::MAX,
etl_config: EtlConfig::default(),
}
.execute(&provider, execute_input)?;

View File

@@ -282,14 +282,22 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
),
StageEnum::AccountHashing => (
Box::new(AccountHashingStage::new(
HashingConfig { clean_threshold: 1, commit_threshold: batch_size },
HashingConfig {
clean_threshold: 1,
commit_threshold: batch_size,
commit_entries: u64::MAX,
},
etl_config,
)),
None,
),
StageEnum::StorageHashing => (
Box::new(StorageHashingStage::new(
HashingConfig { clean_threshold: 1, commit_threshold: batch_size },
HashingConfig {
clean_threshold: 1,
commit_threshold: batch_size,
commit_entries: u64::MAX,
},
etl_config,
)),
None,

View File

@@ -332,11 +332,15 @@ pub struct HashingConfig {
pub clean_threshold: u64,
/// The maximum number of entities to process before committing progress to the database.
pub commit_threshold: u64,
/// The maximum number of changeset entries to process before committing progress. The stage
/// commits after either `commit_threshold` blocks or `commit_entries` entries, whichever
/// comes first. This bounds memory usage when blocks contain many state changes.
pub commit_entries: u64,
}
impl Default for HashingConfig {
fn default() -> Self {
Self { clean_threshold: 500_000, commit_threshold: 100_000 }
Self { clean_threshold: 500_000, commit_threshold: 100_000, commit_entries: 30_000_000 }
}
}

View File

@@ -13,11 +13,12 @@ use reth_provider::{
AccountExtReader, DBProvider, HashingWriter, StatsReader, StorageSettingsCache,
};
use reth_stages_api::{
AccountHashingCheckpoint, EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint,
StageError, StageId, UnwindInput, UnwindOutput,
AccountHashingCheckpoint, BlockRangeOutput, EntitiesCheckpoint, ExecInput, ExecOutput, Stage,
StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput,
};
use reth_storage_errors::provider::ProviderResult;
use std::{
collections::BTreeSet,
fmt::Debug,
ops::{Range, RangeInclusive},
sync::mpsc::{self, Receiver},
@@ -39,6 +40,9 @@ pub struct AccountHashingStage {
pub clean_threshold: u64,
/// The maximum number of accounts to process before committing during unwind.
pub commit_threshold: u64,
/// The maximum number of changeset entries to process before committing. The stage commits
/// after either `commit_threshold` blocks or `commit_entries` entries, whichever comes first.
pub commit_entries: u64,
/// ETL configuration
pub etl_config: EtlConfig,
}
@@ -49,6 +53,7 @@ impl AccountHashingStage {
Self {
clean_threshold: config.clean_threshold,
commit_threshold: config.commit_threshold,
commit_entries: config.commit_entries,
etl_config,
}
}
@@ -129,6 +134,7 @@ impl Default for AccountHashingStage {
Self {
clean_threshold: 500_000,
commit_threshold: 100_000,
commit_entries: 30_000_000,
etl_config: EtlConfig::default(),
}
}
@@ -163,16 +169,18 @@ where
return Ok(ExecOutput::done(input.checkpoint().with_block_number(input.target())));
}
let (from_block, to_block) = input.next_block_range().into_inner();
// Use the total remaining range to decide clean vs incremental.
let total_range = input.target() - input.checkpoint().block_number;
let from_block = input.next_block();
// if there are more blocks then threshold it is faster to go over Plain state and hash all
// account otherwise take changesets aggregate the sets and apply hashing to
// AccountHashing table. Also, if we start from genesis, we need to hash from scratch, as
// genesis accounts are not in changeset.
if to_block - from_block > self.clean_threshold || from_block == 1 {
if total_range > self.clean_threshold || from_block == 1 {
// if there are more blocks than threshold it is faster to go over Plain state and
// hash all accounts otherwise take changesets aggregate the sets and apply
// hashing to HashedAccounts table. Also, if we start from genesis, we need to
// hash from scratch, as genesis accounts are not in changeset.
let tx = provider.tx_ref();
// clear table, load all accounts and hash it
// clear table, load all accounts and hash them
tx.clear::<tables::HashedAccounts>()?;
let mut accounts_cursor = tx.cursor_read::<RawTable<tables::PlainAccountState>>()?;
@@ -220,27 +228,57 @@ where
hashed_account_cursor
.append(RawKey::<B256>::from_vec(key), &RawValue::<Account>::from_vec(value))?;
}
let checkpoint = StageCheckpoint::new(input.target())
.with_account_hashing_stage_checkpoint(AccountHashingCheckpoint {
progress: stage_checkpoint_progress(provider)?,
..Default::default()
});
Ok(ExecOutput { checkpoint, done: true })
} else {
// Aggregate all transition changesets and make a list of accounts that have been
// changed.
let lists = provider.changed_accounts_with_range(from_block..=to_block)?;
// Iterate over plain state and get newest value.
// Assumption we are okay to make is that plainstate represent
// `previous_stage_progress` state.
let accounts = provider.basic_accounts(lists)?;
// Insert and hash accounts to hashing table
// Stream changesets entry-by-entry, bounded by both block count
// (commit_threshold) and entry count (commit_entries), whichever comes first.
let BlockRangeOutput { block_range, is_final_range } =
input.next_block_range_with_threshold(self.commit_threshold);
let (from_block, to_block) = block_range.into_inner();
let tx = provider.tx_ref();
let mut changeset_cursor = tx.cursor_read::<tables::AccountChangeSets>()?;
let mut changed = BTreeSet::new();
let mut total_entries = 0u64;
let mut last_block = from_block;
for entry in changeset_cursor.walk_range(from_block..=to_block)? {
let (block_number, account_before) = entry?;
// Check the entry limit only at block boundaries so we never
// checkpoint mid-block (which would skip the remaining entries
// for that block on the next invocation).
if block_number != last_block && total_entries >= self.commit_entries {
break;
}
last_block = block_number;
changed.insert(account_before.address);
total_entries += 1;
}
let accounts = provider.basic_accounts(changed)?;
provider.insert_account_for_hashing(accounts)?;
let exhausted = total_entries < self.commit_entries;
let done = exhausted && is_final_range;
let progress_block = if exhausted { to_block } else { last_block };
let checkpoint = StageCheckpoint::new(progress_block)
.with_account_hashing_stage_checkpoint(AccountHashingCheckpoint {
progress: stage_checkpoint_progress(provider)?,
..Default::default()
});
Ok(ExecOutput { checkpoint, done })
}
// We finished the hashing stage, no future iterations is expected for the same block range,
// so no checkpoint is needed.
let checkpoint = StageCheckpoint::new(input.target())
.with_account_hashing_stage_checkpoint(AccountHashingCheckpoint {
progress: stage_checkpoint_progress(provider)?,
..Default::default()
});
Ok(ExecOutput { checkpoint, done: true })
}
/// Unwind the stage.
@@ -380,6 +418,7 @@ mod tests {
pub(crate) db: TestStageDB,
commit_threshold: u64,
clean_threshold: u64,
commit_entries: u64,
etl_config: EtlConfig,
}
@@ -444,6 +483,7 @@ mod tests {
db: TestStageDB::default(),
commit_threshold: 1000,
clean_threshold: 1000,
commit_entries: u64::MAX,
etl_config: EtlConfig::default(),
}
}
@@ -460,6 +500,7 @@ mod tests {
Self::S {
commit_threshold: self.commit_threshold,
clean_threshold: self.clean_threshold,
commit_entries: self.commit_entries,
etl_config: self.etl_config.clone(),
}
}

View File

@@ -3,7 +3,7 @@ use itertools::Itertools;
use reth_config::config::{EtlConfig, HashingConfig};
use reth_db_api::{
cursor::{DbCursorRO, DbDupCursorRW},
models::CompactU256,
models::{BlockNumberAddress, CompactU256},
table::Decompress,
tables,
transaction::{DbTx, DbTxMut},
@@ -12,12 +12,13 @@ use reth_etl::Collector;
use reth_primitives_traits::StorageEntry;
use reth_provider::{DBProvider, HashingWriter, StatsReader, StorageReader};
use reth_stages_api::{
EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId,
StorageHashingCheckpoint, UnwindInput, UnwindOutput,
BlockRangeOutput, EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint,
StageError, StageId, StorageHashingCheckpoint, UnwindInput, UnwindOutput,
};
use reth_storage_api::StorageSettingsCache;
use reth_storage_errors::provider::ProviderResult;
use std::{
collections::{BTreeMap, BTreeSet},
fmt::Debug,
sync::mpsc::{self, Receiver},
};
@@ -42,6 +43,9 @@ pub struct StorageHashingStage {
pub clean_threshold: u64,
/// The maximum number of slots to process before committing during unwind.
pub commit_threshold: u64,
/// The maximum number of changeset entries to process before committing. The stage commits
/// after either `commit_threshold` blocks or `commit_entries` entries, whichever comes first.
pub commit_entries: u64,
/// ETL configuration
pub etl_config: EtlConfig,
}
@@ -52,6 +56,7 @@ impl StorageHashingStage {
Self {
clean_threshold: config.clean_threshold,
commit_threshold: config.commit_threshold,
commit_entries: config.commit_entries,
etl_config,
}
}
@@ -62,6 +67,7 @@ impl Default for StorageHashingStage {
Self {
clean_threshold: 500_000,
commit_threshold: 100_000,
commit_entries: 30_000_000,
etl_config: EtlConfig::default(),
}
}
@@ -93,14 +99,18 @@ where
return Ok(ExecOutput::done(input.checkpoint().with_block_number(input.target())));
}
let (from_block, to_block) = input.next_block_range().into_inner();
// Use the total remaining range to decide clean vs incremental.
let total_range = input.target() - input.checkpoint().block_number;
let from_block = input.next_block();
// if there are more blocks then threshold it is faster to go over Plain state and hash all
// account otherwise take changesets aggregate the sets and apply hashing to
// AccountHashing table. Also, if we start from genesis, we need to hash from scratch, as
// genesis accounts are not in changeset, along with their storages.
if to_block - from_block > self.clean_threshold || from_block == 1 {
// clear table, load all accounts and hash it
if total_range > self.clean_threshold || from_block == 1 {
// if there are more blocks than threshold it is faster to go over Plain state and
// hash all storage otherwise take changesets aggregate the sets and apply
// hashing to HashedStorages table. Also, if we start from genesis, we need to
// hash from scratch, as genesis accounts are not in changeset, along with their
// storages.
// clear table, load all entries and hash them
tx.clear::<tables::HashedStorages>()?;
let mut storage_cursor = tx.cursor_read::<tables::PlainStorageState>()?;
@@ -158,26 +168,58 @@ where
},
)?;
}
let checkpoint = StageCheckpoint::new(input.target())
.with_storage_hashing_stage_checkpoint(StorageHashingCheckpoint {
progress: stage_checkpoint_progress(provider)?,
..Default::default()
});
Ok(ExecOutput { checkpoint, done: true })
} else {
// Aggregate all changesets and make list of storages that have been
// changed.
let lists = provider.changed_storages_with_range(from_block..=to_block)?;
// iterate over plain state and get newest storage value.
// Assumption we are okay with is that plain state represent
// `previous_stage_progress` state.
let storages = provider.plain_state_storages(lists)?;
// Stream changesets entry-by-entry, bounded by both block count
// (commit_threshold) and entry count (commit_entries), whichever comes first.
let BlockRangeOutput { block_range, is_final_range } =
input.next_block_range_with_threshold(self.commit_threshold);
let (from_block, to_block) = block_range.into_inner();
let mut changeset_cursor = tx.cursor_read::<tables::StorageChangeSets>()?;
let mut changed: BTreeMap<Address, BTreeSet<B256>> = BTreeMap::new();
let mut total_entries = 0u64;
let mut last_block = from_block;
for entry in
changeset_cursor.walk_range(BlockNumberAddress::range(from_block..=to_block))?
{
let (BlockNumberAddress((block_number, address)), storage_entry) = entry?;
// Check the entry limit only at block boundaries so we never
// checkpoint mid-block (which would skip the remaining entries
// for that block on the next invocation).
if block_number != last_block && total_entries >= self.commit_entries {
break;
}
last_block = block_number;
changed.entry(address).or_default().insert(storage_entry.key);
total_entries += 1;
}
let storages = provider.plain_state_storages(changed)?;
provider.insert_storage_for_hashing(storages)?;
let exhausted = total_entries < self.commit_entries;
let done = exhausted && is_final_range;
let progress_block = if exhausted { to_block } else { last_block };
let checkpoint = StageCheckpoint::new(progress_block)
.with_storage_hashing_stage_checkpoint(StorageHashingCheckpoint {
progress: stage_checkpoint_progress(provider)?,
..Default::default()
});
Ok(ExecOutput { checkpoint, done })
}
// We finished the hashing stage, no future iterations is expected for the same block range,
// so no checkpoint is needed.
let checkpoint = StageCheckpoint::new(input.target())
.with_storage_hashing_stage_checkpoint(StorageHashingCheckpoint {
progress: stage_checkpoint_progress(provider)?,
..Default::default()
});
Ok(ExecOutput { checkpoint, done: true })
}
/// Unwind the stage.
@@ -323,6 +365,7 @@ mod tests {
db: TestStageDB,
commit_threshold: u64,
clean_threshold: u64,
commit_entries: u64,
etl_config: EtlConfig,
}
@@ -332,6 +375,7 @@ mod tests {
db: TestStageDB::default(),
commit_threshold: 1000,
clean_threshold: 1000,
commit_entries: u64::MAX,
etl_config: EtlConfig::default(),
}
}
@@ -348,6 +392,7 @@ mod tests {
Self::S {
commit_threshold: self.commit_threshold,
clean_threshold: self.clean_threshold,
commit_entries: self.commit_entries,
etl_config: self.etl_config.clone(),
}
}