From d8de8afa95aed287ad5fcab7d49cbf92e4131ade Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Wed, 4 Mar 2026 16:55:12 -0500 Subject: [PATCH] fix(stages): bound storage hashing stages memory (#22721) Co-authored-by: Derek Cofausper <256792747+decofe@users.noreply.github.com> Co-authored-by: Amp --- crates/cli/commands/src/stage/dump/merkle.rs | 2 + crates/cli/commands/src/stage/run.rs | 12 ++- crates/config/src/config.rs | 6 +- .../stages/src/stages/hashing_account.rs | 95 +++++++++++++----- .../stages/src/stages/hashing_storage.rs | 99 ++++++++++++++----- 5 files changed, 157 insertions(+), 57 deletions(-) diff --git a/crates/cli/commands/src/stage/dump/merkle.rs b/crates/cli/commands/src/stage/dump/merkle.rs index 0928ea2e84..453242b777 100644 --- a/crates/cli/commands/src/stage/dump/merkle.rs +++ b/crates/cli/commands/src/stage/dump/merkle.rs @@ -127,12 +127,14 @@ fn unwind_and_copy( AccountHashingStage { clean_threshold: u64::MAX, commit_threshold: u64::MAX, + commit_entries: u64::MAX, etl_config: EtlConfig::default(), } .execute(&provider, execute_input)?; StorageHashingStage { clean_threshold: u64::MAX, commit_threshold: u64::MAX, + commit_entries: u64::MAX, etl_config: EtlConfig::default(), } .execute(&provider, execute_input)?; diff --git a/crates/cli/commands/src/stage/run.rs b/crates/cli/commands/src/stage/run.rs index f23ca3adef..55655ed729 100644 --- a/crates/cli/commands/src/stage/run.rs +++ b/crates/cli/commands/src/stage/run.rs @@ -282,14 +282,22 @@ impl ), StageEnum::AccountHashing => ( Box::new(AccountHashingStage::new( - HashingConfig { clean_threshold: 1, commit_threshold: batch_size }, + HashingConfig { + clean_threshold: 1, + commit_threshold: batch_size, + commit_entries: u64::MAX, + }, etl_config, )), None, ), StageEnum::StorageHashing => ( Box::new(StorageHashingStage::new( - HashingConfig { clean_threshold: 1, commit_threshold: batch_size }, + HashingConfig { + clean_threshold: 1, + commit_threshold: batch_size, + commit_entries: u64::MAX, + }, etl_config, )), None, diff --git a/crates/config/src/config.rs b/crates/config/src/config.rs index b7e9d705de..820bd6b147 100644 --- a/crates/config/src/config.rs +++ b/crates/config/src/config.rs @@ -332,11 +332,15 @@ pub struct HashingConfig { pub clean_threshold: u64, /// The maximum number of entities to process before committing progress to the database. pub commit_threshold: u64, + /// The maximum number of changeset entries to process before committing progress. The stage + /// commits after either `commit_threshold` blocks or `commit_entries` entries, whichever + /// comes first. This bounds memory usage when blocks contain many state changes. + pub commit_entries: u64, } impl Default for HashingConfig { fn default() -> Self { - Self { clean_threshold: 500_000, commit_threshold: 100_000 } + Self { clean_threshold: 500_000, commit_threshold: 100_000, commit_entries: 30_000_000 } } } diff --git a/crates/stages/stages/src/stages/hashing_account.rs b/crates/stages/stages/src/stages/hashing_account.rs index 72cd5c3217..2410e8131f 100644 --- a/crates/stages/stages/src/stages/hashing_account.rs +++ b/crates/stages/stages/src/stages/hashing_account.rs @@ -13,11 +13,12 @@ use reth_provider::{ AccountExtReader, DBProvider, HashingWriter, StatsReader, StorageSettingsCache, }; use reth_stages_api::{ - AccountHashingCheckpoint, EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, - StageError, StageId, UnwindInput, UnwindOutput, + AccountHashingCheckpoint, BlockRangeOutput, EntitiesCheckpoint, ExecInput, ExecOutput, Stage, + StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput, }; use reth_storage_errors::provider::ProviderResult; use std::{ + collections::BTreeSet, fmt::Debug, ops::{Range, RangeInclusive}, sync::mpsc::{self, Receiver}, @@ -39,6 +40,9 @@ pub struct AccountHashingStage { pub clean_threshold: u64, /// The maximum number of accounts to process before committing during unwind. pub commit_threshold: u64, + /// The maximum number of changeset entries to process before committing. The stage commits + /// after either `commit_threshold` blocks or `commit_entries` entries, whichever comes first. + pub commit_entries: u64, /// ETL configuration pub etl_config: EtlConfig, } @@ -49,6 +53,7 @@ impl AccountHashingStage { Self { clean_threshold: config.clean_threshold, commit_threshold: config.commit_threshold, + commit_entries: config.commit_entries, etl_config, } } @@ -129,6 +134,7 @@ impl Default for AccountHashingStage { Self { clean_threshold: 500_000, commit_threshold: 100_000, + commit_entries: 30_000_000, etl_config: EtlConfig::default(), } } @@ -163,16 +169,18 @@ where return Ok(ExecOutput::done(input.checkpoint().with_block_number(input.target()))); } - let (from_block, to_block) = input.next_block_range().into_inner(); + // Use the total remaining range to decide clean vs incremental. + let total_range = input.target() - input.checkpoint().block_number; + let from_block = input.next_block(); - // if there are more blocks then threshold it is faster to go over Plain state and hash all - // account otherwise take changesets aggregate the sets and apply hashing to - // AccountHashing table. Also, if we start from genesis, we need to hash from scratch, as - // genesis accounts are not in changeset. - if to_block - from_block > self.clean_threshold || from_block == 1 { + if total_range > self.clean_threshold || from_block == 1 { + // if there are more blocks than threshold it is faster to go over Plain state and + // hash all accounts otherwise take changesets aggregate the sets and apply + // hashing to HashedAccounts table. Also, if we start from genesis, we need to + // hash from scratch, as genesis accounts are not in changeset. let tx = provider.tx_ref(); - // clear table, load all accounts and hash it + // clear table, load all accounts and hash them tx.clear::()?; let mut accounts_cursor = tx.cursor_read::>()?; @@ -220,27 +228,57 @@ where hashed_account_cursor .append(RawKey::::from_vec(key), &RawValue::::from_vec(value))?; } + + let checkpoint = StageCheckpoint::new(input.target()) + .with_account_hashing_stage_checkpoint(AccountHashingCheckpoint { + progress: stage_checkpoint_progress(provider)?, + ..Default::default() + }); + + Ok(ExecOutput { checkpoint, done: true }) } else { - // Aggregate all transition changesets and make a list of accounts that have been - // changed. - let lists = provider.changed_accounts_with_range(from_block..=to_block)?; - // Iterate over plain state and get newest value. - // Assumption we are okay to make is that plainstate represent - // `previous_stage_progress` state. - let accounts = provider.basic_accounts(lists)?; - // Insert and hash accounts to hashing table + // Stream changesets entry-by-entry, bounded by both block count + // (commit_threshold) and entry count (commit_entries), whichever comes first. + let BlockRangeOutput { block_range, is_final_range } = + input.next_block_range_with_threshold(self.commit_threshold); + let (from_block, to_block) = block_range.into_inner(); + + let tx = provider.tx_ref(); + let mut changeset_cursor = tx.cursor_read::()?; + let mut changed = BTreeSet::new(); + let mut total_entries = 0u64; + let mut last_block = from_block; + + for entry in changeset_cursor.walk_range(from_block..=to_block)? { + let (block_number, account_before) = entry?; + + // Check the entry limit only at block boundaries so we never + // checkpoint mid-block (which would skip the remaining entries + // for that block on the next invocation). + if block_number != last_block && total_entries >= self.commit_entries { + break; + } + + last_block = block_number; + changed.insert(account_before.address); + total_entries += 1; + } + + let accounts = provider.basic_accounts(changed)?; provider.insert_account_for_hashing(accounts)?; + + let exhausted = total_entries < self.commit_entries; + let done = exhausted && is_final_range; + let progress_block = if exhausted { to_block } else { last_block }; + + let checkpoint = StageCheckpoint::new(progress_block) + .with_account_hashing_stage_checkpoint(AccountHashingCheckpoint { + progress: stage_checkpoint_progress(provider)?, + ..Default::default() + }); + + Ok(ExecOutput { checkpoint, done }) } - - // We finished the hashing stage, no future iterations is expected for the same block range, - // so no checkpoint is needed. - let checkpoint = StageCheckpoint::new(input.target()) - .with_account_hashing_stage_checkpoint(AccountHashingCheckpoint { - progress: stage_checkpoint_progress(provider)?, - ..Default::default() - }); - - Ok(ExecOutput { checkpoint, done: true }) } /// Unwind the stage. @@ -380,6 +418,7 @@ mod tests { pub(crate) db: TestStageDB, commit_threshold: u64, clean_threshold: u64, + commit_entries: u64, etl_config: EtlConfig, } @@ -444,6 +483,7 @@ mod tests { db: TestStageDB::default(), commit_threshold: 1000, clean_threshold: 1000, + commit_entries: u64::MAX, etl_config: EtlConfig::default(), } } @@ -460,6 +500,7 @@ mod tests { Self::S { commit_threshold: self.commit_threshold, clean_threshold: self.clean_threshold, + commit_entries: self.commit_entries, etl_config: self.etl_config.clone(), } } diff --git a/crates/stages/stages/src/stages/hashing_storage.rs b/crates/stages/stages/src/stages/hashing_storage.rs index 749a697d7a..59913bb452 100644 --- a/crates/stages/stages/src/stages/hashing_storage.rs +++ b/crates/stages/stages/src/stages/hashing_storage.rs @@ -3,7 +3,7 @@ use itertools::Itertools; use reth_config::config::{EtlConfig, HashingConfig}; use reth_db_api::{ cursor::{DbCursorRO, DbDupCursorRW}, - models::CompactU256, + models::{BlockNumberAddress, CompactU256}, table::Decompress, tables, transaction::{DbTx, DbTxMut}, @@ -12,12 +12,13 @@ use reth_etl::Collector; use reth_primitives_traits::StorageEntry; use reth_provider::{DBProvider, HashingWriter, StatsReader, StorageReader}; use reth_stages_api::{ - EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId, - StorageHashingCheckpoint, UnwindInput, UnwindOutput, + BlockRangeOutput, EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, + StageError, StageId, StorageHashingCheckpoint, UnwindInput, UnwindOutput, }; use reth_storage_api::StorageSettingsCache; use reth_storage_errors::provider::ProviderResult; use std::{ + collections::{BTreeMap, BTreeSet}, fmt::Debug, sync::mpsc::{self, Receiver}, }; @@ -42,6 +43,9 @@ pub struct StorageHashingStage { pub clean_threshold: u64, /// The maximum number of slots to process before committing during unwind. pub commit_threshold: u64, + /// The maximum number of changeset entries to process before committing. The stage commits + /// after either `commit_threshold` blocks or `commit_entries` entries, whichever comes first. + pub commit_entries: u64, /// ETL configuration pub etl_config: EtlConfig, } @@ -52,6 +56,7 @@ impl StorageHashingStage { Self { clean_threshold: config.clean_threshold, commit_threshold: config.commit_threshold, + commit_entries: config.commit_entries, etl_config, } } @@ -62,6 +67,7 @@ impl Default for StorageHashingStage { Self { clean_threshold: 500_000, commit_threshold: 100_000, + commit_entries: 30_000_000, etl_config: EtlConfig::default(), } } @@ -93,14 +99,18 @@ where return Ok(ExecOutput::done(input.checkpoint().with_block_number(input.target()))); } - let (from_block, to_block) = input.next_block_range().into_inner(); + // Use the total remaining range to decide clean vs incremental. + let total_range = input.target() - input.checkpoint().block_number; + let from_block = input.next_block(); - // if there are more blocks then threshold it is faster to go over Plain state and hash all - // account otherwise take changesets aggregate the sets and apply hashing to - // AccountHashing table. Also, if we start from genesis, we need to hash from scratch, as - // genesis accounts are not in changeset, along with their storages. - if to_block - from_block > self.clean_threshold || from_block == 1 { - // clear table, load all accounts and hash it + if total_range > self.clean_threshold || from_block == 1 { + // if there are more blocks than threshold it is faster to go over Plain state and + // hash all storage otherwise take changesets aggregate the sets and apply + // hashing to HashedStorages table. Also, if we start from genesis, we need to + // hash from scratch, as genesis accounts are not in changeset, along with their + // storages. + + // clear table, load all entries and hash them tx.clear::()?; let mut storage_cursor = tx.cursor_read::()?; @@ -158,26 +168,58 @@ where }, )?; } + + let checkpoint = StageCheckpoint::new(input.target()) + .with_storage_hashing_stage_checkpoint(StorageHashingCheckpoint { + progress: stage_checkpoint_progress(provider)?, + ..Default::default() + }); + + Ok(ExecOutput { checkpoint, done: true }) } else { - // Aggregate all changesets and make list of storages that have been - // changed. - let lists = provider.changed_storages_with_range(from_block..=to_block)?; - // iterate over plain state and get newest storage value. - // Assumption we are okay with is that plain state represent - // `previous_stage_progress` state. - let storages = provider.plain_state_storages(lists)?; + // Stream changesets entry-by-entry, bounded by both block count + // (commit_threshold) and entry count (commit_entries), whichever comes first. + let BlockRangeOutput { block_range, is_final_range } = + input.next_block_range_with_threshold(self.commit_threshold); + let (from_block, to_block) = block_range.into_inner(); + + let mut changeset_cursor = tx.cursor_read::()?; + let mut changed: BTreeMap> = BTreeMap::new(); + let mut total_entries = 0u64; + let mut last_block = from_block; + + for entry in + changeset_cursor.walk_range(BlockNumberAddress::range(from_block..=to_block))? + { + let (BlockNumberAddress((block_number, address)), storage_entry) = entry?; + + // Check the entry limit only at block boundaries so we never + // checkpoint mid-block (which would skip the remaining entries + // for that block on the next invocation). + if block_number != last_block && total_entries >= self.commit_entries { + break; + } + + last_block = block_number; + changed.entry(address).or_default().insert(storage_entry.key); + total_entries += 1; + } + + let storages = provider.plain_state_storages(changed)?; provider.insert_storage_for_hashing(storages)?; + + let exhausted = total_entries < self.commit_entries; + let done = exhausted && is_final_range; + let progress_block = if exhausted { to_block } else { last_block }; + + let checkpoint = StageCheckpoint::new(progress_block) + .with_storage_hashing_stage_checkpoint(StorageHashingCheckpoint { + progress: stage_checkpoint_progress(provider)?, + ..Default::default() + }); + + Ok(ExecOutput { checkpoint, done }) } - - // We finished the hashing stage, no future iterations is expected for the same block range, - // so no checkpoint is needed. - let checkpoint = StageCheckpoint::new(input.target()) - .with_storage_hashing_stage_checkpoint(StorageHashingCheckpoint { - progress: stage_checkpoint_progress(provider)?, - ..Default::default() - }); - - Ok(ExecOutput { checkpoint, done: true }) } /// Unwind the stage. @@ -323,6 +365,7 @@ mod tests { db: TestStageDB, commit_threshold: u64, clean_threshold: u64, + commit_entries: u64, etl_config: EtlConfig, } @@ -332,6 +375,7 @@ mod tests { db: TestStageDB::default(), commit_threshold: 1000, clean_threshold: 1000, + commit_entries: u64::MAX, etl_config: EtlConfig::default(), } } @@ -348,6 +392,7 @@ mod tests { Self::S { commit_threshold: self.commit_threshold, clean_threshold: self.clean_threshold, + commit_entries: self.commit_entries, etl_config: self.etl_config.clone(), } }