From 7273ce802870e7d8a861f6848d4398286d0722d1 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Mon, 22 May 2023 18:12:46 +0400 Subject: [PATCH] feat(stages): checkpoint hashing stages into a new table (#2735) --- bin/reth/src/stage/drop.rs | 2 - crates/primitives/src/checkpoints.rs | 34 ++++ crates/stages/src/stages/hashing_account.rs | 136 ++++++--------- crates/stages/src/stages/hashing_storage.rs | 182 +++++++++----------- 4 files changed, 174 insertions(+), 180 deletions(-) diff --git a/bin/reth/src/stage/drop.rs b/bin/reth/src/stage/drop.rs index 89533ab489..29cc9628d5 100644 --- a/bin/reth/src/stage/drop.rs +++ b/bin/reth/src/stage/drop.rs @@ -82,12 +82,10 @@ impl Command { tool.db.update(|tx| { // Clear hashed accounts tx.clear::()?; - tx.put::(ACCOUNT_HASHING.0.into(), Vec::new())?; tx.put::(ACCOUNT_HASHING.0.to_string(), Default::default())?; // Clear hashed storages tx.clear::()?; - tx.put::(STORAGE_HASHING.0.into(), Vec::new())?; tx.put::(STORAGE_HASHING.0.to_string(), Default::default())?; Ok::<_, eyre::Error>(()) diff --git a/crates/primitives/src/checkpoints.rs b/crates/primitives/src/checkpoints.rs index bf1cfdafa5..ee826d6b04 100644 --- a/crates/primitives/src/checkpoints.rs +++ b/crates/primitives/src/checkpoints.rs @@ -138,6 +138,40 @@ impl StageCheckpoint { pub fn new(block_number: BlockNumber) -> Self { Self { block_number, ..Default::default() } } + + /// Returns the account hashing stage checkpoint, if any. + pub fn account_hashing_stage_checkpoint(&self) -> Option { + match self.stage_checkpoint { + Some(StageUnitCheckpoint::Account(checkpoint)) => Some(checkpoint), + _ => None, + } + } + + /// Returns the storage hashing stage checkpoint, if any. + pub fn storage_hashing_stage_checkpoint(&self) -> Option { + match self.stage_checkpoint { + Some(StageUnitCheckpoint::Storage(checkpoint)) => Some(checkpoint), + _ => None, + } + } + + /// Sets the stage checkpoint to account hashing. + pub fn with_account_hashing_stage_checkpoint( + mut self, + checkpoint: AccountHashingCheckpoint, + ) -> Self { + self.stage_checkpoint = Some(StageUnitCheckpoint::Account(checkpoint)); + self + } + + /// Sets the stage checkpoint to storage hashing. + pub fn with_storage_hashing_stage_checkpoint( + mut self, + checkpoint: StorageHashingCheckpoint, + ) -> Self { + self.stage_checkpoint = Some(StageUnitCheckpoint::Storage(checkpoint)); + self + } } // TODO(alexey): ideally, we'd want to display block number + stage-specific metric (if available) diff --git a/crates/stages/src/stages/hashing_account.rs b/crates/stages/src/stages/hashing_account.rs index a48b2d97a0..ba58dc530e 100644 --- a/crates/stages/src/stages/hashing_account.rs +++ b/crates/stages/src/stages/hashing_account.rs @@ -1,7 +1,6 @@ use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; use itertools::Itertools; use rayon::slice::ParallelSliceMut; -use reth_codecs::Compact; use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, database::Database, @@ -39,43 +38,6 @@ impl Default for AccountHashingStage { } } -impl AccountHashingStage { - /// Saves the hashing progress - pub fn save_checkpoint( - &mut self, - tx: &Transaction<'_, DB>, - checkpoint: AccountHashingCheckpoint, - ) -> Result<(), StageError> { - debug!(target: "sync::stages::account_hashing::exec", checkpoint = ?checkpoint, "Saving inner account hashing checkpoint"); - - let mut buf = vec![]; - checkpoint.to_compact(&mut buf); - - Ok(tx.put::(ACCOUNT_HASHING.0.into(), buf)?) - } - - /// Gets the hashing progress - pub fn get_checkpoint( - &self, - tx: &Transaction<'_, DB>, - ) -> Result { - let buf = - tx.get::(ACCOUNT_HASHING.0.into())?.unwrap_or_default(); - - if buf.is_empty() { - return Ok(AccountHashingCheckpoint::default()) - } - - let (checkpoint, _) = AccountHashingCheckpoint::from_compact(&buf, buf.len()); - - if checkpoint.address.is_some() { - debug!(target: "sync::stages::account_hashing::exec", checkpoint = ?checkpoint, "Continuing inner account hashing checkpoint"); - } - - Ok(checkpoint) - } -} - // TODO: Rewrite this /// `SeedOpts` provides configuration parameters for calling `AccountHashingStage::seed` /// in unit tests or benchmarks to generate an initial database state for running the @@ -173,22 +135,31 @@ impl Stage for AccountHashingStage { // AccountHashing table. Also, if we start from genesis, we need to hash from scratch, as // genesis accounts are not in changeset. if to_block - from_block > self.clean_threshold || from_block == 1 { - let mut checkpoint = self.get_checkpoint(tx)?; + let stage_checkpoint = input + .checkpoint + .and_then(|checkpoint| checkpoint.account_hashing_stage_checkpoint()); - if checkpoint.address.is_none() || - // Checkpoint is no longer valid if the range of transitions changed. - // An already hashed account may have been changed with the new range, and therefore should be hashed again. - checkpoint.from != from_block || - checkpoint.to != to_block - { - // clear table, load all accounts and hash it - tx.clear::()?; + let start_address = match stage_checkpoint { + Some(AccountHashingCheckpoint { address: address @ Some(_), from, to }) + // Checkpoint is only valid if the range of transitions didn't change. + // An already hashed account may have been changed with the new range, + // and therefore should be hashed again. + if from == from_block && to == to_block => + { + debug!(target: "sync::stages::account_hashing::exec", checkpoint = ?stage_checkpoint, "Continuing inner account hashing checkpoint"); - checkpoint = AccountHashingCheckpoint::default(); - self.save_checkpoint(tx, checkpoint)?; + address + } + _ => { + // clear table, load all accounts and hash it + tx.clear::()?; + + None + } } + .take() + .map(RawKey::new); - let start_address = checkpoint.address.take().map(RawKey::new); let next_address = { let mut accounts_cursor = tx.cursor_read::>()?; @@ -245,18 +216,15 @@ impl Stage for AccountHashingStage { }; if let Some((next_address, _)) = &next_address { - checkpoint.address = Some(next_address.key().unwrap()); - checkpoint.from = from_block; - checkpoint.to = to_block; - } - - self.save_checkpoint(tx, checkpoint)?; - - if next_address.is_some() { - // from block is correct here as were are iteration over state for this - // particular block - info!(target: "sync::stages::hashing_account", stage_progress = %input.checkpoint(), is_final_range = false, "Stage iteration finished"); - return Ok(ExecOutput { checkpoint: input.checkpoint(), done: false }) + let checkpoint = input.checkpoint().with_account_hashing_stage_checkpoint( + AccountHashingCheckpoint { + address: Some(next_address.key().unwrap()), + from: from_block, + to: to_block, + }, + ); + info!(target: "sync::stages::hashing_account", stage_progress = %checkpoint, is_final_range = false, "Stage iteration finished"); + return Ok(ExecOutput { checkpoint, done: false }) } } else { // Aggregate all transition changesets and and make list of account that have been @@ -270,8 +238,12 @@ impl Stage for AccountHashingStage { tx.insert_account_for_hashing(accounts.into_iter())?; } - info!(target: "sync::stages::hashing_account", stage_progress = %input.previous_stage_checkpoint(), is_final_range = true, "Stage iteration finished"); - Ok(ExecOutput { checkpoint: input.previous_stage_checkpoint(), done: true }) + // We finished the hashing stage, no future iterations is expected for the same block range, + // so no checkpoint is needed. + let checkpoint = input.previous_stage_checkpoint(); + + info!(target: "sync::stages::hashing_account", stage_progress = %checkpoint, is_final_range = true, "Stage iteration finished"); + Ok(ExecOutput { checkpoint, done: true }) } /// Unwind the stage. @@ -295,11 +267,11 @@ impl Stage for AccountHashingStage { mod tests { use super::*; use crate::test_utils::{ - stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, - UnwindStageTestRunner, PREV_STAGE_ID, + stage_test_suite_ext, ExecuteStageTestRunner, TestRunnerError, UnwindStageTestRunner, + PREV_STAGE_ID, }; use assert_matches::assert_matches; - use reth_primitives::{Account, U256}; + use reth_primitives::{Account, StageUnitCheckpoint, U256}; use test_utils::*; stage_test_suite_ext!(AccountHashingTestRunner, account_hashing); @@ -342,15 +314,10 @@ mod tests { runner.seed_execution(input).expect("failed to seed execution"); - // first run, hash first five account. + // first run, hash first five accounts. let rx = runner.execute(input); let result = rx.await.unwrap(); - assert_matches!( - result, - Ok(ExecOutput { checkpoint: StageCheckpoint { block_number: 10, .. }, done: false }) - ); - assert_eq!(runner.tx.table::().unwrap().len(), 5); let fifth_address = runner .tx .query(|tx| { @@ -364,20 +331,31 @@ mod tests { }) .unwrap(); - let stage_progress = runner.stage().get_checkpoint(&runner.tx.inner()).unwrap(); - assert_eq!( - stage_progress, - AccountHashingCheckpoint { address: Some(fifth_address), from: 11, to: 20 } + assert_matches!( + result, + Ok(ExecOutput { + checkpoint: StageCheckpoint { + block_number: 10, + stage_checkpoint: Some(StageUnitCheckpoint::Account( + AccountHashingCheckpoint { address: Some(address), from: 11, to: 20 } + )) + }, + done: false + }) if address == fifth_address ); + assert_eq!(runner.tx.table::().unwrap().len(), 5); - // second run, hash next five account. + // second run, hash next five accounts. input.checkpoint = Some(result.unwrap().checkpoint); let rx = runner.execute(input); let result = rx.await.unwrap(); assert_matches!( result, - Ok(ExecOutput { checkpoint: StageCheckpoint { block_number: 20, .. }, done: true }) + Ok(ExecOutput { + checkpoint: StageCheckpoint { block_number: 20, stage_checkpoint: None }, + done: true + }) ); assert_eq!(runner.tx.table::().unwrap().len(), 10); diff --git a/crates/stages/src/stages/hashing_storage.rs b/crates/stages/src/stages/hashing_storage.rs index 321328ebb6..a5fa4fdab7 100644 --- a/crates/stages/src/stages/hashing_storage.rs +++ b/crates/stages/src/stages/hashing_storage.rs @@ -1,6 +1,5 @@ use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; use num_traits::Zero; -use reth_codecs::Compact; use reth_db::{ cursor::DbDupCursorRO, database::Database, @@ -33,43 +32,6 @@ impl Default for StorageHashingStage { } } -impl StorageHashingStage { - /// Saves the hashing progress - pub fn save_checkpoint( - &mut self, - tx: &Transaction<'_, DB>, - checkpoint: StorageHashingCheckpoint, - ) -> Result<(), StageError> { - debug!(target: "sync::stages::storage_hashing::exec", checkpoint = ?checkpoint, "Saving inner storage hashing checkpoint"); - - let mut buf = vec![]; - checkpoint.to_compact(&mut buf); - - Ok(tx.put::(STORAGE_HASHING.0.into(), buf)?) - } - - /// Gets the hashing progress - pub fn get_checkpoint( - &self, - tx: &Transaction<'_, DB>, - ) -> Result { - let buf = - tx.get::(STORAGE_HASHING.0.into())?.unwrap_or_default(); - - if buf.is_empty() { - return Ok(StorageHashingCheckpoint::default()) - } - - let (checkpoint, _) = StorageHashingCheckpoint::from_compact(&buf, buf.len()); - - if checkpoint.address.is_some() { - debug!(target: "sync::stages::storage_hashing::exec", checkpoint = ?checkpoint, "Continuing inner storage hashing checkpoint"); - } - - Ok(checkpoint) - } -} - #[async_trait::async_trait] impl Stage for StorageHashingStage { /// Return the id of the stage @@ -94,22 +56,34 @@ impl Stage for StorageHashingStage { // AccountHashing table. Also, if we start from genesis, we need to hash from scratch, as // genesis accounts are not in changeset, along with their storages. if to_block - from_block > self.clean_threshold || from_block == 1 { - let mut checkpoint = self.get_checkpoint(tx)?; + let stage_checkpoint = input + .checkpoint + .and_then(|checkpoint| checkpoint.storage_hashing_stage_checkpoint()); - if checkpoint.address.is_none() || - // Checkpoint is no longer valid if the range of blocks changed. - // An already hashed storage may have been changed with the new range, and therefore should be hashed again. - checkpoint.to != to_block || - checkpoint.from != from_block - { - tx.clear::()?; + let (mut current_key, mut current_subkey) = match stage_checkpoint { + Some(StorageHashingCheckpoint { + address: address @ Some(_), + storage, + from, + to , + }) + // Checkpoint is only valid if the range of transitions didn't change. + // An already hashed storage may have been changed with the new range, + // and therefore should be hashed again. + if from == from_block && to == to_block => + { + debug!(target: "sync::stages::storage_hashing::exec", checkpoint = ?stage_checkpoint, "Continuing inner storage hashing checkpoint"); - checkpoint = StorageHashingCheckpoint::default(); - self.save_checkpoint(tx, checkpoint)?; - } + (address, storage) + } + _ => { + // clear table, load all accounts and hash it + tx.clear::()?; + + (None, None) + } + }; - let mut current_key = checkpoint.address.take(); - let mut current_subkey = checkpoint.storage.take(); let mut keccak_address = None; let mut hashed_batch = BTreeMap::new(); @@ -169,20 +143,17 @@ impl Stage for StorageHashingStage { tx.put::(addr, StorageEntry { key, value }) })?; - if let Some(address) = ¤t_key { - checkpoint.address = Some(*address); - checkpoint.storage = current_subkey; - checkpoint.from = from_block; - checkpoint.to = to_block; - } - - self.save_checkpoint(tx, checkpoint)?; - if current_key.is_some() { - // `from_block` is correct here as were are iteration over state for this - // particular block. - info!(target: "sync::stages::hashing_storage", stage_progress = %input.checkpoint(), is_final_range = false, "Stage iteration finished"); - return Ok(ExecOutput { checkpoint: input.checkpoint(), done: false }) + let checkpoint = input.checkpoint().with_storage_hashing_stage_checkpoint( + StorageHashingCheckpoint { + address: current_key, + storage: current_subkey, + from: from_block, + to: to_block, + }, + ); + info!(target: "sync::stages::hashing_storage", stage_progress = %checkpoint, is_final_range = false, "Stage iteration finished"); + return Ok(ExecOutput { checkpoint, done: false }) } } else { // Aggregate all changesets and and make list of storages that have been @@ -195,8 +166,12 @@ impl Stage for StorageHashingStage { tx.insert_storage_for_hashing(storages.into_iter())?; } - info!(target: "sync::stages::hashing_storage", stage_progress = %input.previous_stage_checkpoint(), is_final_range = true, "Stage iteration finished"); - Ok(ExecOutput { checkpoint: input.previous_stage_checkpoint(), done: true }) + // We finished the hashing stage, no future iterations is expected for the same block range, + // so no checkpoint is needed. + let checkpoint = input.previous_stage_checkpoint(); + + info!(target: "sync::stages::hashing_storage", stage_progress = %checkpoint, is_final_range = true, "Stage iteration finished"); + Ok(ExecOutput { checkpoint, done: true }) } /// Unwind the stage. @@ -231,7 +206,7 @@ mod tests { use reth_interfaces::test_utils::generators::{ random_block_range, random_contract_account_range, }; - use reth_primitives::{Address, SealedBlock, StorageEntry, H256, U256}; + use reth_primitives::{Address, SealedBlock, StageUnitCheckpoint, StorageEntry, H256, U256}; stage_test_suite_ext!(StorageHashingTestRunner, storage_hashing); @@ -250,7 +225,7 @@ mod tests { // hang on one key. Seed execution inserts more than one storage entry per address. runner.set_commit_threshold(1); - let input = ExecInput { + let mut input = ExecInput { previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))), checkpoint: Some(StageCheckpoint::new(stage_progress)), }; @@ -261,6 +236,7 @@ mod tests { if let Ok(result) = runner.execute(input).await.unwrap() { if !result.done { // Continue from checkpoint + input.checkpoint = Some(result.checkpoint); continue } else { assert!(result.checkpoint.block_number == previous_stage); @@ -296,11 +272,7 @@ mod tests { // first run, hash first half of storages. let rx = runner.execute(input); let result = rx.await.unwrap(); - assert_matches!( - result, - Ok(ExecOutput { checkpoint: StageCheckpoint { block_number: 100, .. }, done: false }) - ); - assert_eq!(runner.tx.table::().unwrap().len(), 500); + let (progress_address, progress_key) = runner .tx .query(|tx| { @@ -314,27 +286,29 @@ mod tests { }) .unwrap(); - let stage_progress = runner.stage().get_checkpoint(&runner.tx.inner()).unwrap(); - let progress_key = stage_progress.storage.map(|_| progress_key); - assert_eq!( - stage_progress, - StorageHashingCheckpoint { - address: Some(progress_address), - storage: progress_key, - from: 101, - to: 500 - } + assert_matches!( + result, + Ok(ExecOutput { + checkpoint: StageCheckpoint { + block_number: 100, + stage_checkpoint: Some(StageUnitCheckpoint::Storage(StorageHashingCheckpoint { + address: Some(address), + storage: Some(storage), + from: 101, + to: 500 + })) + }, + done: false + }) if address == progress_address && storage == progress_key ); + assert_eq!(runner.tx.table::().unwrap().len(), 500); // second run with commit threshold of 2 to check if subkey is set. runner.set_commit_threshold(2); + input.checkpoint = Some(result.unwrap().checkpoint); let rx = runner.execute(input); let result = rx.await.unwrap(); - assert_matches!( - result, - Ok(ExecOutput { checkpoint: StageCheckpoint { block_number: 100, .. }, done: false }) - ); - assert_eq!(runner.tx.table::().unwrap().len(), 502); + let (progress_address, progress_key) = runner .tx .query(|tx| { @@ -348,17 +322,24 @@ mod tests { }) .unwrap(); - let stage_progress = runner.stage().get_checkpoint(&runner.tx.inner()).unwrap(); - let progress_key = stage_progress.storage.map(|_| progress_key); - assert_eq!( - stage_progress, - StorageHashingCheckpoint { - address: Some(progress_address), - storage: progress_key, - from: 101, - to: 500 - } + assert_matches!( + result, + Ok(ExecOutput { + checkpoint: StageCheckpoint { + block_number: 100, + stage_checkpoint: Some(StageUnitCheckpoint::Storage( + StorageHashingCheckpoint { + address: Some(address), + storage: Some(storage), + from: 101, + to: 500, + } + )) + }, + done: false + }) if address == progress_address && storage == progress_key ); + assert_eq!(runner.tx.table::().unwrap().len(), 502); // third last run, hash rest of storages. runner.set_commit_threshold(1000); @@ -368,7 +349,10 @@ mod tests { assert_matches!( result, - Ok(ExecOutput { checkpoint: StageCheckpoint { block_number: 500, .. }, done: true }) + Ok(ExecOutput { + checkpoint: StageCheckpoint { block_number: 500, stage_checkpoint: None }, + done: true + }) ); assert_eq!( runner.tx.table::().unwrap().len(),