diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index 6ba054cd69..132019b66e 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -328,7 +328,11 @@ mod tests { output, Ok(ExecOutput { stage_progress, done: true }) if stage_progress > first_run_progress ); - assert!(runner.validate_execution(input, output.ok()).is_ok(), "execution validation"); + assert_matches!( + runner.validate_execution(input, output.ok()), + Ok(_), + "execution validation" + ); } /// Checks that the stage unwinds correctly, even if a transaction in a block is missing. @@ -382,7 +386,7 @@ mod tests { Ok(UnwindOutput { stage_progress }) if stage_progress == 1 ); - assert!(runner.validate_unwind(input).is_ok(), "unwind validation"); + assert_matches!(runner.validate_unwind(input), Ok(_), "unwind validation"); } mod test_utils { diff --git a/crates/stages/src/stages/hashing_account.rs b/crates/stages/src/stages/hashing_account.rs index 2b4746ace1..566d140395 100644 --- a/crates/stages/src/stages/hashing_account.rs +++ b/crates/stages/src/stages/hashing_account.rs @@ -2,7 +2,7 @@ use crate::{ db::Transaction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput, }; use reth_db::{ - cursor::DbCursorRO, + cursor::{DbCursorRO, DbCursorRW}, database::Database, tables, transaction::{DbTx, DbTxMut}, @@ -91,6 +91,9 @@ impl Stage for AccountHashingStage { break } } else { + let mut plain_accounts = tx.cursor_read::()?; + let mut hashed_accounts = tx.cursor_write::()?; + // Aggregate all transition changesets and and make list of account that have been // changed. tx.cursor_read::()? @@ -106,16 +109,19 @@ impl Stage for AccountHashingStage { // iterate over plain state and get newest value. // Assumption we are okay to make is that plainstate represent // `previous_stage_progress` state. - .map(|address| tx.get::(address).map(|a| (address, a))) + .map(|address| { + plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v))) + }) .collect::, _>>()? .into_iter() - .try_for_each(|(address, account)| { + .try_for_each(|(address, account)| -> Result<(), StageError> { let hashed_address = keccak256(address); if let Some(account) = account { - tx.put::(hashed_address, account) - } else { - tx.delete::(hashed_address, None).map(|_| ()) + hashed_accounts.upsert(hashed_address, account)? + } else if hashed_accounts.seek_exact(hashed_address)?.is_some() { + hashed_accounts.delete_current()?; } + Ok(()) })?; } @@ -135,6 +141,8 @@ impl Stage for AccountHashingStage { let from_transition_rev = tx.get_block_transition(input.unwind_to)?; let to_transition_rev = tx.get_block_transition(input.stage_progress)?; + let mut hashed_accounts = tx.cursor_write::()?; + // Aggregate all transition changesets and and make list of account that have been changed. tx.cursor_read::()? .walk_range(from_transition_rev..to_transition_rev)? @@ -156,12 +164,13 @@ impl Stage for AccountHashingStage { .collect::>() .into_iter() // Apply values to HashedState (if Account is None remove it); - .try_for_each(|(hashed_address, account)| { + .try_for_each(|(hashed_address, account)| -> Result<(), StageError> { if let Some(account) = account { - tx.put::(hashed_address, account) - } else { - tx.delete::(hashed_address, None).map(|_| ()) + hashed_accounts.upsert(hashed_address, account)?; + } else if hashed_accounts.seek_exact(hashed_address)?.is_some() { + hashed_accounts.delete_current()?; } + Ok(()) })?; Ok(UnwindOutput { stage_progress: input.unwind_to }) diff --git a/crates/stages/src/stages/hashing_storage.rs b/crates/stages/src/stages/hashing_storage.rs index 5caf9729de..817115c731 100644 --- a/crates/stages/src/stages/hashing_storage.rs +++ b/crates/stages/src/stages/hashing_storage.rs @@ -2,14 +2,17 @@ use crate::{ db::Transaction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput, }; use reth_db::{ - cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW}, + cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO}, database::Database, models::TransitionIdAddress, tables, transaction::{DbTx, DbTxMut}, }; use reth_primitives::{keccak256, Address, StorageEntry, H160, H256, U256}; -use std::{collections::BTreeMap, fmt::Debug}; +use std::{ + collections::{BTreeMap, BTreeSet}, + fmt::Debug, +}; use tracing::*; /// The [`StageId`] of the storage hashing stage. @@ -49,8 +52,8 @@ impl Stage for StorageHashingStage { let previous_stage_progress = input.previous_stage_progress(); // read storage changeset, merge it into one changeset and calculate storage hashes. - let from_transition = tx.get_block_transition(stage_progress)? + 1; - let to_transition = tx.get_block_transition(previous_stage_progress)? + 1; + let from_transition = tx.get_block_transition(stage_progress)?; + let to_transition = tx.get_block_transition(previous_stage_progress)?; // if there are more blocks then threshold it is faster to go over Plain state and hash all // account otherwise take changesets aggregate the sets and apply hashing to @@ -95,6 +98,7 @@ impl Stage for StorageHashingStage { } } else { let mut plain_storage = tx.cursor_dup_read::()?; + let mut hashed_storage = tx.cursor_dup_write::()?; // Aggregate all transition changesets and and make list of storages that have been // changed. @@ -109,12 +113,9 @@ impl Stage for StorageHashingStage { // it is needed as it is dup table. .fold( BTreeMap::new(), - |mut accounts: BTreeMap>, + |mut accounts: BTreeMap>, (TransitionIdAddress((_, address)), storage_entry)| { - accounts - .entry(address) - .or_default() - .insert(storage_entry.key, storage_entry.value); + accounts.entry(address).or_default().insert(storage_entry.key); accounts }, ) @@ -125,10 +126,10 @@ impl Stage for StorageHashingStage { .map(|(address, storage)| { storage .into_iter() - .map(|(key, val)| { + .map(|key| { plain_storage .seek_by_key_subkey(address, key) - .map(|ret| (keccak256(key), (val, ret.map(|e| e.value)))) + .map(|ret| (keccak256(key), ret.map(|e| e.value))) }) .collect::, _>>() .map(|storage| (keccak256(address), storage)) @@ -138,19 +139,20 @@ impl Stage for StorageHashingStage { // Hash the address and key and apply them to HashedStorage (if Storage is None // just remove it); .try_for_each(|(address, storage)| { - storage.into_iter().try_for_each( - |(key, (old_val, new_val))| -> Result<(), StageError> { - tx.delete::( - address, - Some(StorageEntry { key, value: old_val }), - )?; - if let Some(value) = new_val { - let val = StorageEntry { key, value }; - tx.put::(address, val)? - } - Ok(()) - }, - ) + storage.into_iter().try_for_each(|(key, val)| -> Result<(), StageError> { + if hashed_storage + .seek_by_key_subkey(address, key)? + .filter(|entry| entry.key == key) + .is_some() + { + hashed_storage.delete_current()?; + } + + if let Some(value) = val { + hashed_storage.upsert(address, StorageEntry { key, value })?; + } + Ok(()) + }) })?; } @@ -164,8 +166,8 @@ impl Stage for StorageHashingStage { tx: &mut Transaction<'_, DB>, input: UnwindInput, ) -> Result { - let from_transition_rev = tx.get_block_transition(input.unwind_to)? + 1; - let to_transition_rev = tx.get_block_transition(input.stage_progress)? + 1; + let from_transition_rev = tx.get_block_transition(input.unwind_to)?; + let to_transition_rev = tx.get_block_transition(input.stage_progress)?; let mut hashed_storage = tx.cursor_dup_write::()?; @@ -193,13 +195,18 @@ impl Stage for StorageHashingStage { .map(|((address, key), value)| ((keccak256(address), keccak256(key)), value)) .collect::>() .into_iter() - // Apply values to HashedStorage (if Value is zero remove it); + // Apply values to HashedStorage (if Value is zero just remove it); .try_for_each(|((address, key), value)| -> Result<(), StageError> { - hashed_storage.seek_by_key_subkey(address, key)?; - hashed_storage.delete_current()?; + if hashed_storage + .seek_by_key_subkey(address, key)? + .filter(|entry| entry.key == key) + .is_some() + { + hashed_storage.delete_current()?; + } if value != U256::ZERO { - hashed_storage.append_dup(address, StorageEntry { key, value })?; + hashed_storage.upsert(address, StorageEntry { key, value })?; } Ok(()) })?; @@ -317,8 +324,6 @@ mod tests { tx.put::(transaction.hash(), tx_id)?; tx.put::(tx_id, transaction.clone())?; tx.put::(tx_id, transition_id)?; - tx_id += 1; - transition_id += 1; let (addr, _) = accounts .get_mut(rand::random::() % n_accounts as usize) @@ -333,13 +338,15 @@ mod tests { (transition_id, *addr).into(), new_entry, progress.header.number == stage_progress, - ) + )?; + tx_id += 1; + transition_id += 1; + Ok(()) })?; // Randomize rewards let has_reward: bool = rand::random(); if has_reward { - transition_id += 1; self.insert_storage_entry( tx, (transition_id, Address::random()).into(), @@ -349,6 +356,7 @@ mod tests { }, progress.header.number == stage_progress, )?; + transition_id += 1; } tx.put::(key.number(), transition_id)?; @@ -442,7 +450,7 @@ mod tests { if let Some(e) = tx .cursor_dup_write::()? .seek_by_key_subkey(hashed_address, hashed_entry.key)? - .and_then(|e| if e.key == hashed_entry.key { Some(e) } else { None }) + .filter(|e| e.key == hashed_entry.key) { tx.delete::(hashed_address, Some(e)) .expect("failed to delete entry"); @@ -470,15 +478,20 @@ mod tests { let mut rev_changeset_walker = changeset_cursor.walk_back(None)?; while let Some((tid_address, entry)) = rev_changeset_walker.next().transpose()? { - if tid_address.transition_id() <= target_transition { + if tid_address.transition_id() < target_transition { break } - storage_cursor.seek_by_key_subkey(tid_address.address(), entry.key)?; - storage_cursor.delete_current()?; + if storage_cursor + .seek_by_key_subkey(tid_address.address(), entry.key)? + .filter(|e| e.key == entry.key) + .is_some() + { + storage_cursor.delete_current()?; + } if entry.value != U256::ZERO { - tx.put::(tid_address.address(), entry)?; + storage_cursor.upsert(tid_address.address(), entry)?; } } Ok(()) diff --git a/crates/stages/src/test_utils/macros.rs b/crates/stages/src/test_utils/macros.rs index 56f6a561a2..ff19be6fec 100644 --- a/crates/stages/src/test_utils/macros.rs +++ b/crates/stages/src/test_utils/macros.rs @@ -18,7 +18,7 @@ macro_rules! stage_test_suite { assert_matches::assert_matches!(result, Ok(_)); // Validate the stage execution - assert!(runner.validate_execution(input, result.unwrap().ok()).is_ok(), "execution validation"); + assert_matches::assert_matches!(runner.validate_execution(input, result.unwrap().ok()),Ok(_), "execution validation"); } // Run the complete stage execution flow. @@ -47,7 +47,7 @@ macro_rules! stage_test_suite { ); // Validate the stage execution - assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation"); + assert_matches::assert_matches!(runner.validate_execution(input, result.ok()),Ok(_), "execution validation"); } // Check that unwind does not panic on no new entries within the input range. @@ -70,7 +70,7 @@ macro_rules! stage_test_suite { ); // Validate the stage unwind - assert!(runner.validate_unwind(input).is_ok(), "unwind validation"); + assert_matches::assert_matches!(runner.validate_unwind(input),Ok(_), "unwind validation"); } // Run complete execute and unwind flow. @@ -97,7 +97,7 @@ macro_rules! stage_test_suite { Ok(ExecOutput { done, stage_progress }) if done && stage_progress == previous_stage ); - assert!(runner.validate_execution(execute_input, result.ok()).is_ok(), "execution validation"); + assert_matches::assert_matches!(runner.validate_execution(execute_input, result.ok()),Ok(_), "execution validation"); // Run stage unwind @@ -115,7 +115,7 @@ macro_rules! stage_test_suite { ); // Validate the stage unwind - assert!(runner.validate_unwind(unwind_input).is_ok(), "unwind validation"); + assert_matches::assert_matches!(runner.validate_unwind(unwind_input),Ok(_), "unwind validation"); } } }; @@ -156,7 +156,7 @@ macro_rules! stage_test_suite_ext { ); // Validate the stage execution - assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation"); + assert_matches::assert_matches!(runner.validate_execution(input, result.ok()),Ok(_), "execution validation"); } } };