diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 7169993283..45bf5d7eb8 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -23,7 +23,8 @@ use reth_primitives::{BlockNumber, ChainSpec, NodeRecord, H256}; use reth_stages::{ metrics::HeaderMetrics, stages::{ - bodies::BodyStage, execution::ExecutionStage, headers::HeaderStage, + bodies::BodyStage, execution::ExecutionStage, hashing_account::AccountHashingStage, + hashing_storage::StorageHashingStage, headers::HeaderStage, merkle::MerkleStage, sender_recovery::SenderRecoveryStage, total_difficulty::TotalDifficultyStage, }, PipelineEvent, StageId, @@ -181,7 +182,13 @@ impl Command { .push(ExecutionStage { chain_spec: self.chain.clone(), commit_threshold: config.stages.execution.commit_threshold, - }); + }) + // This Merkle stage is used only on unwind + .push(MerkleStage { is_execute: false }) + .push(AccountHashingStage { clean_threshold: 500_000, commit_threshold: 100_000 }) + .push(StorageHashingStage { clean_threshold: 500_000, commit_threshold: 100_000 }) + // This merkle stage is used only for execute + .push(MerkleStage { is_execute: true }); if let Some(tip) = self.tip { debug!(target: "reth::cli", %tip, "Tip manually set"); diff --git a/crates/interfaces/src/test_utils/generators.rs b/crates/interfaces/src/test_utils/generators.rs index 8624478f5c..d6e6c2e650 100644 --- a/crates/interfaces/src/test_utils/generators.rs +++ b/crates/interfaces/src/test_utils/generators.rs @@ -1,7 +1,7 @@ use rand::{distributions::uniform::SampleRange, thread_rng, Rng}; use reth_primitives::{ - proofs, Address, Bytes, Header, SealedBlock, SealedHeader, Signature, Transaction, - TransactionKind, TransactionSigned, TxLegacy, H256, U256, + proofs, Account, Address, Bytes, Header, SealedBlock, SealedHeader, Signature, Transaction, + TransactionKind, TransactionSigned, TxLegacy, H160, H256, U256, }; use secp256k1::{KeyPair, Message as SecpMessage, Secp256k1, SecretKey}; @@ -164,6 +164,24 @@ pub fn random_block_range( blocks } +/// Generate random Externaly Owned Account (EOA account without contract). +pub fn random_eoa_account() -> (Address, Account) { + let nonce: u64 = rand::random(); + let balance = U256::from(rand::random::()); + let addr = H160::from(rand::random::()); + + (addr, Account { nonce, balance, bytecode_hash: None }) +} + +/// Docs +pub fn random_eoa_account_range(acc_range: &mut std::ops::Range) -> Vec<(Address, Account)> { + let mut accounts = Vec::with_capacity(acc_range.end.saturating_sub(acc_range.start) as usize); + for _ in acc_range { + accounts.push(random_eoa_account()) + } + accounts +} + #[cfg(test)] mod test { use std::str::FromStr; diff --git a/crates/stages/src/stages/hashing_account.rs b/crates/stages/src/stages/hashing_account.rs new file mode 100644 index 0000000000..06b1e55405 --- /dev/null +++ b/crates/stages/src/stages/hashing_account.rs @@ -0,0 +1,398 @@ +use crate::{ + db::Transaction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput, +}; +use reth_db::{ + cursor::{DbCursorRO, DbCursorRW}, + database::Database, + tables, + transaction::{DbTx, DbTxMut}, +}; +use reth_primitives::{keccak256, Account, Address, H160}; +use std::{ + collections::{BTreeMap, BTreeSet}, + fmt::Debug, +}; +use tracing::*; + +const ACCOUNT_HASHING: StageId = StageId("AccountHashingStage"); + +/// Account hashing stage hashes plain account. +/// This is preparation before generating intermediate hashes and calculating Merkle tree root. +#[derive(Debug)] +pub struct AccountHashingStage { + /// The threshold for switching from incremental hashing + /// of changes to whole storage hashing. Num of transitions. + pub clean_threshold: u64, + /// The size of inserted items after which the control + /// flow will be returned to the pipeline for commit. + pub commit_threshold: u64, +} + +#[async_trait::async_trait] +impl Stage for AccountHashingStage { + /// Return the id of the stage + fn id(&self) -> StageId { + ACCOUNT_HASHING + } + + /// Execute the stage. + async fn execute( + &mut self, + tx: &mut Transaction<'_, DB>, + input: ExecInput, + ) -> Result { + let stage_progress = input.stage_progress.unwrap_or_default(); + let previous_stage_progress = input.previous_stage_progress(); + + // read account changeset, merge it into one changeset and calculate account hashes. + let from_transition = tx.get_block_transition(stage_progress)?; + let to_transition = tx.get_block_transition(previous_stage_progress)?; + + // if there are more blocks then threshold it is faster to go over Plain state and hash all + // account otherwise take changesets aggregate the sets and apply hashing to + // AccountHashing table + if to_transition - from_transition > self.clean_threshold { + // clear table, load all accounts and hash it + tx.clear::()?; + tx.commit()?; + + let mut first_key = H160::zero(); + loop { + let next_key = { + let mut accounts = tx.cursor_read::()?; + + let hashed_batch = accounts + .walk(first_key)? + .take(self.commit_threshold as usize) + .map(|res| res.map(|(address, account)| (keccak256(address), account))) + .collect::, _>>()?; + + // next key of iterator + let next_key = accounts.next()?; + + let mut hashes = tx.cursor_write::()?; + // iterate and append presorted hashed accounts + hashed_batch.into_iter().try_for_each(|(k, v)| hashes.append(k, v))?; + next_key + }; + tx.commit()?; + if let Some((next_key, _)) = next_key { + first_key = next_key; + continue + } + break + } + } else { + // Aggregate all transition changesets and and make list of account that have been + // changed. + tx.cursor_read::()? + .walk(from_transition)? + .take_while(|res| res.as_ref().map(|(k, _)| *k < to_transition).unwrap_or_default()) + .collect::, _>>()? + .into_iter() + // fold all account to one set of changed accounts + .fold(BTreeSet::new(), |mut accounts: BTreeSet
, (_, account_before)| { + accounts.insert(account_before.address); + accounts + }) + .into_iter() + // iterate over plain state and get newest value. + // Assumption we are okay to make is that plainstate represent + // `previous_stage_progress` state. + .map(|address| tx.get::(address).map(|a| (address, a))) + .collect::, _>>()? + .into_iter() + .try_for_each(|(address, account)| { + let hashed_address = keccak256(address); + if let Some(account) = account { + tx.put::(hashed_address, account) + } else { + tx.delete::(hashed_address, None).map(|_| ()) + } + })?; + } + + info!(target: "sync::stages::hashing_account", "Stage finished"); + Ok(ExecOutput { stage_progress: input.previous_stage_progress(), done: true }) + } + + /// Unwind the stage. + async fn unwind( + &mut self, + tx: &mut Transaction<'_, DB>, + input: UnwindInput, + ) -> Result { + // There is no threshold on account unwind, we will always take changesets and + // apply past values to HashedAccount table. + + let from_transition_rev = tx.get_block_transition(input.unwind_to)?; + let to_transition_rev = tx.get_block_transition(input.stage_progress)?; + + // Aggregate all transition changesets and and make list of account that have been changed. + tx.cursor_read::()? + .walk(from_transition_rev)? + .take_while(|res| res.as_ref().map(|(k, _)| *k < to_transition_rev).unwrap_or_default()) + .collect::, _>>()? + .into_iter() + .rev() + // fold all account to get the old balance/nonces and account that needs to be removed + .fold( + BTreeMap::new(), + |mut accounts: BTreeMap>, (_, account_before)| { + accounts.insert(account_before.address, account_before.info); + accounts + }, + ) + .into_iter() + // hash addresses and collect it inside sorted BTreeMap. + // We are doing keccak only once per address. + .map(|(address, account)| (keccak256(address), account)) + .collect::>() + .into_iter() + // Apply values to HashedState (if Account is None remove it); + .try_for_each(|(hashed_address, account)| { + if let Some(account) = account { + tx.put::(hashed_address, account) + } else { + tx.delete::(hashed_address, None).map(|_| ()) + } + })?; + + Ok(UnwindOutput { stage_progress: input.unwind_to }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_utils::{ + stage_test_suite_ext, ExecuteStageTestRunner, TestRunnerError, UnwindStageTestRunner, + PREV_STAGE_ID, + }; + use assert_matches::assert_matches; + use reth_interfaces::test_utils::generators::random_block_range; + use reth_primitives::{Account, SealedBlock, H256, U256}; + use reth_provider::insert_canonical_block; + use test_utils::*; + + stage_test_suite_ext!(AccountHashingTestRunner); + + #[tokio::test] + async fn execute_below_clean_threshold() { + let (previous_stage, stage_progress) = (20, 10); + // Set up the runner + let mut runner = AccountHashingTestRunner::default(); + runner.set_clean_threshold(1); + + let input = ExecInput { + previous_stage: Some((PREV_STAGE_ID, previous_stage)), + stage_progress: Some(stage_progress), + }; + + runner.seed_execution(input).expect("failed to seed execution"); + + let rx = runner.execute(input); + let result = rx.await.unwrap(); + + assert_matches!(result, Ok(ExecOutput {done, stage_progress}) if done && stage_progress == previous_stage); + + // Validate the stage execution + assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation"); + } + + mod test_utils { + use super::*; + use crate::{ + stages::hashing_account::AccountHashingStage, + test_utils::{StageTestRunner, TestTransaction}, + ExecInput, ExecOutput, UnwindInput, + }; + use reth_db::{ + cursor::DbCursorRO, + models::AccountBeforeTx, + tables, + transaction::{DbTx, DbTxMut}, + }; + use reth_interfaces::test_utils::generators::random_eoa_account_range; + + pub(crate) struct AccountHashingTestRunner { + pub(crate) tx: TestTransaction, + commit_threshold: u64, + clean_threshold: u64, + } + + impl AccountHashingTestRunner { + pub(crate) fn set_clean_threshold(&mut self, threshold: u64) { + self.clean_threshold = threshold; + } + + #[allow(dead_code)] + pub(crate) fn set_commit_threshold(&mut self, threshold: u64) { + self.commit_threshold = threshold; + } + + pub(crate) fn insert_blocks( + &self, + blocks: Vec, + ) -> Result<(), TestRunnerError> { + let mut blocks_iter = blocks.iter(); + while let Some(block) = blocks_iter.next() { + self.tx.commit(|tx| { + insert_canonical_block(tx, block, true).unwrap(); + Ok(()) + })?; + } + + Ok(()) + } + + pub(crate) fn insert_accounts( + &self, + accounts: &Vec<(Address, Account)>, + ) -> Result<(), TestRunnerError> { + let mut accs_iter = accounts.iter(); + while let Some((addr, acc)) = accs_iter.next() { + self.tx.commit(|tx| { + tx.put::(*addr, *acc)?; + Ok(()) + })?; + } + + Ok(()) + } + + /// Iterates over PlainAccount table and checks that the accounts match the ones + /// in the HashedAccount table + pub(crate) fn check_hashed_accounts(&self) -> Result<(), TestRunnerError> { + self.tx.query(|tx| { + let mut acc_cursor = tx.cursor_read::()?; + let mut hashed_acc_cursor = tx.cursor_read::()?; + + while let Some((address, account)) = acc_cursor.next()? { + let hashed_addr = keccak256(address); + if let Some((_, acc)) = hashed_acc_cursor.seek_exact(hashed_addr)? { + assert_eq!(acc, account) + } + } + Ok(()) + })?; + + Ok(()) + } + + /// Same as check_hashed_accounts, only that checks with the old account state, + /// namely, the same account with nonce - 1 and balance - 1. + pub(crate) fn check_old_hashed_accounts(&self) -> Result<(), TestRunnerError> { + self.tx.query(|tx| { + let mut acc_cursor = tx.cursor_read::()?; + let mut hashed_acc_cursor = tx.cursor_read::()?; + + while let Some((address, account)) = acc_cursor.next()? { + let Account { nonce, balance, .. } = account; + let old_acc = Account { + nonce: nonce - 1, + balance: balance - U256::from(1), + bytecode_hash: None, + }; + let hashed_addr = keccak256(address); + if let Some((_, acc)) = hashed_acc_cursor.seek_exact(hashed_addr)? { + assert_eq!(acc, old_acc) + } + } + Ok(()) + })?; + + Ok(()) + } + } + + impl Default for AccountHashingTestRunner { + fn default() -> Self { + Self { + tx: TestTransaction::default(), + commit_threshold: 1000, + clean_threshold: 1000, + } + } + } + + impl StageTestRunner for AccountHashingTestRunner { + type S = AccountHashingStage; + + fn tx(&self) -> &TestTransaction { + &self.tx + } + + fn stage(&self) -> Self::S { + Self::S { + commit_threshold: self.commit_threshold, + clean_threshold: self.clean_threshold, + } + } + } + + #[async_trait::async_trait] + impl ExecuteStageTestRunner for AccountHashingTestRunner { + type Seed = Vec<(Address, Account)>; + + fn seed_execution(&mut self, input: ExecInput) -> Result { + let end = input.previous_stage_progress() + 1; + + let blocks = random_block_range(0..end, H256::zero(), 0..3); + self.insert_blocks(blocks)?; + + let n_accounts = 2; + let accounts = random_eoa_account_range(&mut (0..n_accounts)); + self.insert_accounts(&accounts)?; + + // seed account changeset + self.tx + .commit(|tx| { + let (_, last_transition) = + tx.cursor_read::()?.last()?.unwrap(); + + let first_transition = + last_transition.checked_sub(n_accounts).unwrap_or_default(); + + for (t, (addr, acc)) in (first_transition..last_transition).zip(&accounts) { + let Account { nonce, balance, .. } = acc; + let prev_acc = Account { + nonce: nonce - 1, + balance: balance - U256::from(1), + bytecode_hash: None, + }; + let acc_before_tx = + AccountBeforeTx { address: *addr, info: Some(prev_acc) }; + tx.put::(t, acc_before_tx)?; + } + + Ok(()) + }) + .unwrap(); + + Ok(accounts) + } + + fn validate_execution( + &self, + input: ExecInput, + output: Option, + ) -> Result<(), TestRunnerError> { + if let Some(output) = output { + let start_block = input.stage_progress.unwrap_or_default() + 1; + let end_block = output.stage_progress; + if start_block > end_block { + return Ok(()) + } + } + self.check_hashed_accounts() + } + } + + impl UnwindStageTestRunner for AccountHashingTestRunner { + fn validate_unwind(&self, _input: UnwindInput) -> Result<(), TestRunnerError> { + self.check_old_hashed_accounts() + } + } + } +} diff --git a/crates/stages/src/stages/hashing_storage.rs b/crates/stages/src/stages/hashing_storage.rs new file mode 100644 index 0000000000..f902d1851f --- /dev/null +++ b/crates/stages/src/stages/hashing_storage.rs @@ -0,0 +1,475 @@ +use crate::{ + db::Transaction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput, +}; +use reth_db::{ + cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW}, + database::Database, + models::TransitionIdAddress, + tables, + transaction::{DbTx, DbTxMut}, +}; +use reth_primitives::{keccak256, Address, StorageEntry, H160, H256, U256}; +use std::{collections::BTreeMap, fmt::Debug}; +use tracing::*; + +const STORAGE_HASHING: StageId = StageId("StorageHashingStage"); + +/// Storage hashing stage hashes plain storage. +/// This is preparation before generating intermediate hashes and calculating Merkle tree root. +#[derive(Debug)] +pub struct StorageHashingStage { + /// The threshold for switching from incremental hashing + /// of changes to whole storage hashing. Num of transitions. + pub clean_threshold: u64, + /// The size of inserted items after which the control + /// flow will be returned to the pipeline for commit + pub commit_threshold: u64, +} + +#[async_trait::async_trait] +impl Stage for StorageHashingStage { + /// Return the id of the stage + fn id(&self) -> StageId { + STORAGE_HASHING + } + + /// Execute the stage. + async fn execute( + &mut self, + tx: &mut Transaction<'_, DB>, + input: ExecInput, + ) -> Result { + let stage_progress = input.stage_progress.unwrap_or_default(); + let previous_stage_progress = input.previous_stage_progress(); + + // read storage changeset, merge it into one changeset and calculate storage hashes. + let from_transition = tx.get_block_transition(stage_progress)? + 1; + let to_transition = tx.get_block_transition(previous_stage_progress)? + 1; + + // if there are more blocks then threshold it is faster to go over Plain state and hash all + // account otherwise take changesets aggregate the sets and apply hashing to + // AccountHashing table + if to_transition - from_transition > self.clean_threshold { + tx.clear::()?; + tx.commit()?; + + let mut first_key = H160::zero(); + loop { + let next_key = { + let mut storage = tx.cursor_dup_read::()?; + + let hashed_batch = storage + .walk(first_key)? + .take(self.commit_threshold as usize) + .map(|res| { + res.map(|(address, mut slot)| { + // both account address and storage slot key are hashed for merkle + // tree. + slot.key = keccak256(slot.key); + (keccak256(address), slot) + }) + }) + .collect::, _>>()?; + + // next key of iterator + let next_key = storage.next()?; + + let mut hashes = tx.cursor_write::()?; + // iterate and append presorted hashed slots + hashed_batch.into_iter().try_for_each(|(k, v)| hashes.append(k, v))?; + next_key + }; + tx.commit()?; + if let Some((next_key, _)) = next_key { + first_key = next_key; + continue + } + break + } + } else { + let mut plain_storage = tx.cursor_dup_read::()?; + + // Aggregate all transition changesets and and make list of storages that have been + // changed. + tx.cursor_read::()? + .walk((from_transition, H160::zero()).into())? + .take_while(|res| { + res.as_ref().map(|(k, _)| k.0 .0 < to_transition).unwrap_or_default() + }) + .collect::, _>>()? + .into_iter() + // fold all storages and save its old state so we can remove it from HashedStorage + // it is needed as it is dup table. + .fold( + BTreeMap::new(), + |mut accounts: BTreeMap>, + (TransitionIdAddress((_, address)), storage_entry)| { + accounts + .entry(address) + .or_default() + .insert(storage_entry.key, storage_entry.value); + accounts + }, + ) + .into_iter() + // iterate over plain state and get newest storage value. + // Assumption we are okay with is that plain state represent + // `previous_stage_progress` state. + .map(|(address, storage)| { + storage + .into_iter() + .map(|(key, val)| { + plain_storage + .seek_by_key_subkey(address, key) + .map(|ret| (key, (val, ret.map(|e| e.value)))) + }) + .collect::, _>>() + .map(|storage| (address, storage)) + }) + .collect::, _>>()? + .into_iter() + // Hash the address and key and apply them to HashedStorage (if Storage is None + // just remove it); + .try_for_each(|(address, storage)| { + let hashed_address = keccak256(address); + storage.into_iter().try_for_each( + |(key, (old_val, new_val))| -> Result<(), StageError> { + let key = keccak256(key); + tx.delete::( + hashed_address, + Some(StorageEntry { key, value: old_val }), + )?; + if let Some(value) = new_val { + let val = StorageEntry { key, value }; + tx.put::(hashed_address, val)? + } + Ok(()) + }, + ) + })?; + } + + info!(target: "sync::stages::hashing_storage", "Stage finished"); + Ok(ExecOutput { stage_progress: input.previous_stage_progress(), done: true }) + } + + /// Unwind the stage. + async fn unwind( + &mut self, + tx: &mut Transaction<'_, DB>, + input: UnwindInput, + ) -> Result { + let from_transition_rev = tx.get_block_transition(input.unwind_to)? + 1; + let to_transition_rev = tx.get_block_transition(input.stage_progress)? + 1; + + let mut hashed_storage = tx.cursor_dup_write::()?; + + // Aggregate all transition changesets and make list of accounts that have been changed. + tx.cursor_read::()? + .walk((from_transition_rev, H160::zero()).into())? + .take_while(|res| { + res.as_ref() + .map(|(TransitionIdAddress((k, _)), _)| *k < to_transition_rev) + .unwrap_or_default() + }) + .collect::, _>>()? + .into_iter() + .rev() + // fold all account to get the old balance/nonces and account that needs to be removed + .fold( + BTreeMap::new(), + |mut accounts: BTreeMap<(Address, H256), U256>, + (TransitionIdAddress((_, address)), storage_entry)| { + accounts.insert((address, storage_entry.key), storage_entry.value); + accounts + }, + ) + .into_iter() + // hash addresses and collect it inside sorted BTreeMap. + // We are doing keccak only once per address. + .map(|((address, key), value)| ((keccak256(address), keccak256(key)), value)) + .collect::>() + .into_iter() + // Apply values to HashedStorage (if Value is zero remove it); + .try_for_each(|((address, key), value)| -> Result<(), StageError> { + hashed_storage.seek_by_key_subkey(address, key)?; + hashed_storage.delete_current()?; + + if value != U256::ZERO { + hashed_storage.append_dup(address, StorageEntry { key, value })?; + } + Ok(()) + })?; + + Ok(UnwindOutput { stage_progress: input.unwind_to }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_utils::{ + stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, + TestTransaction, UnwindStageTestRunner, PREV_STAGE_ID, + }; + use assert_matches::assert_matches; + use reth_db::{ + cursor::DbCursorRW, + mdbx::{tx::Tx, WriteMap, RW}, + models::{BlockNumHash, StoredBlockBody, TransitionIdAddress}, + }; + use reth_interfaces::test_utils::generators::random_block_range; + use reth_primitives::{ + SealedBlock, StorageEntry, Transaction, TransactionKind, TxLegacy, H256, U256, + }; + + stage_test_suite_ext!(StorageHashingTestRunner); + + /// Execute with low clean threshold so as to hash whole storage + #[tokio::test] + async fn execute_clean() { + let (previous_stage, stage_progress) = (500, 100); + + // Set up the runner + let mut runner = StorageHashingTestRunner::default(); + // set low threshold so we hash the whole storage + runner.set_clean_threshold(1); + let input = ExecInput { + previous_stage: Some((PREV_STAGE_ID, previous_stage)), + stage_progress: Some(stage_progress), + }; + + runner.seed_execution(input).expect("failed to seed execution"); + + let rx = runner.execute(input); + + // Assert the successful result + let result = rx.await.unwrap(); + assert_matches!( + result, + Ok(ExecOutput { done, stage_progress }) + if done && stage_progress == previous_stage + ); + + // Validate the stage execution + assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation"); + } + + struct StorageHashingTestRunner { + tx: TestTransaction, + commit_threshold: u64, + clean_threshold: u64, + } + + impl Default for StorageHashingTestRunner { + fn default() -> Self { + Self { tx: TestTransaction::default(), commit_threshold: 1000, clean_threshold: 1000 } + } + } + + impl StageTestRunner for StorageHashingTestRunner { + type S = StorageHashingStage; + + fn tx(&self) -> &TestTransaction { + &self.tx + } + + fn stage(&self) -> Self::S { + Self::S { + commit_threshold: self.commit_threshold, + clean_threshold: self.clean_threshold, + } + } + } + + #[async_trait::async_trait] + impl ExecuteStageTestRunner for StorageHashingTestRunner { + type Seed = Vec; + + fn seed_execution(&mut self, input: ExecInput) -> Result { + let stage_progress = input.stage_progress.unwrap_or_default(); + let end = input.previous_stage_progress() + 1; + + let blocks = random_block_range(stage_progress..end, H256::zero(), 0..3); + + self.tx.insert_headers(blocks.iter().map(|block| &block.header))?; + + let mut iter = blocks.iter(); + let (mut transition_id, mut tx_id) = (0, 0); + + while let Some(progress) = iter.next() { + // Insert last progress data + self.tx.commit(|tx| { + let key: BlockNumHash = (progress.number, progress.hash()).into(); + + let body = StoredBlockBody { + start_tx_id: tx_id, + tx_count: progress.body.len() as u64, + }; + + progress.body.iter().try_for_each(|transaction| { + tx.put::(transaction.hash(), tx_id)?; + tx.put::(tx_id, transaction.clone())?; + tx.put::(tx_id, transition_id)?; + tx_id += 1; + transition_id += 1; + let (to, value) = match transaction.transaction { + Transaction::Legacy(TxLegacy { + to: TransactionKind::Call(to), + value, + .. + }) => (to, value), + _ => unreachable!(), + }; + let new_entry = + StorageEntry { key: keccak256("transfers"), value: U256::from(value) }; + self.insert_storage_entry( + tx, + (transition_id, to).into(), + new_entry, + progress.header.number == stage_progress, + ) + })?; + + // Randomize rewards + let has_reward: bool = rand::random(); + if has_reward { + transition_id += 1; + self.insert_storage_entry( + tx, + (transition_id, Address::random()).into(), + StorageEntry { + key: keccak256("mining"), + value: U256::from(rand::random::()), + }, + progress.header.number == stage_progress, + )?; + } + + tx.put::(key.number(), transition_id)?; + tx.put::(key, body) + })?; + } + + Ok(blocks) + } + + fn validate_execution( + &self, + input: ExecInput, + output: Option, + ) -> Result<(), TestRunnerError> { + if let Some(output) = output { + let start_block = input.stage_progress.unwrap_or_default() + 1; + let end_block = output.stage_progress; + if start_block > end_block { + return Ok(()) + } + } + self.check_hashed_storage() + } + } + + impl UnwindStageTestRunner for StorageHashingTestRunner { + fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> { + self.unwind_storage(input)?; + self.check_hashed_storage() + } + } + + impl StorageHashingTestRunner { + fn set_clean_threshold(&mut self, threshold: u64) { + self.clean_threshold = threshold; + } + + fn check_hashed_storage(&self) -> Result<(), TestRunnerError> { + self.tx + .query(|tx| { + let mut storage_cursor = tx.cursor_dup_read::()?; + let mut hashed_storage_cursor = + tx.cursor_dup_read::()?; + + let mut expected = 0; + + while let Some((address, entry)) = storage_cursor.next()? { + let key = keccak256(entry.key); + let got = + hashed_storage_cursor.seek_by_key_subkey(keccak256(address), key)?; + assert_eq!( + got, + Some(StorageEntry { key, ..entry }), + "{expected}: {address:?}" + ); + expected += 1; + } + let count = tx + .cursor_dup_read::()? + .walk([0; 32].into())? + .count(); + + assert_eq!(count, expected); + Ok(()) + }) + .map_err(|e| e.into()) + } + + fn insert_storage_entry( + &self, + tx: &Tx<'_, RW, WriteMap>, + tid_address: TransitionIdAddress, + entry: StorageEntry, + hash: bool, + ) -> Result<(), reth_db::Error> { + let mut storage_cursor = tx.cursor_dup_write::()?; + let prev_entry = storage_cursor + .seek_by_key_subkey(tid_address.address(), entry.key)? + .and_then(|e| { + storage_cursor.delete_current().expect("failed to delete entry"); + Some(e) + }) + .unwrap_or(StorageEntry { key: entry.key, value: U256::from(0) }); + if hash { + tx.cursor_dup_write::()?.append_dup( + keccak256(tid_address.address()), + StorageEntry { key: keccak256(entry.key), value: entry.value }, + )?; + } + storage_cursor.append_dup(tid_address.address(), entry)?; + + tx.cursor_dup_write::()? + .append_dup(tid_address, prev_entry)?; + Ok(()) + } + + fn unwind_storage(&self, input: UnwindInput) -> Result<(), TestRunnerError> { + tracing::debug!("unwinding storage..."); + let target_transition = self + .tx + .inner() + .get_block_transition(input.unwind_to) + .map_err(|e| TestRunnerError::Internal(Box::new(e)))?; + self.tx.commit(|tx| { + let mut storage_cursor = tx.cursor_dup_write::()?; + let mut changeset_cursor = tx.cursor_dup_read::()?; + let mut row = changeset_cursor.last()?; + + while let Some((tid_address, entry)) = row { + if tid_address.transition_id() <= target_transition { + break + } + + storage_cursor.seek_by_key_subkey(tid_address.address(), entry.key)?; + storage_cursor.delete_current()?; + + if entry.value != U256::ZERO { + storage_cursor.append_dup(tid_address.address(), entry)?; + } + + row = changeset_cursor.prev()?; + } + Ok(()) + })?; + Ok(()) + } + } +} diff --git a/crates/stages/src/stages/merkle.rs b/crates/stages/src/stages/merkle.rs new file mode 100644 index 0000000000..73f4cc4004 --- /dev/null +++ b/crates/stages/src/stages/merkle.rs @@ -0,0 +1,64 @@ +use crate::{ + db::Transaction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput, +}; +use reth_db::database::Database; +use std::fmt::Debug; +use tracing::*; + +const MERKLE_EXECUTION: StageId = StageId("MerkleExecuteStage"); +const MERKLE_UNWIND: StageId = StageId("MerkleUnwindStage"); + +/// Merkle stage uses input from [AccountHashingStage] and [StorageHashingStage] stages +/// and calculated intermediate hashed and state root. +/// This stage depends on the Account and Storage stages. It will be executed after them during +/// execution, and before them during unwinding. +#[derive(Debug)] +pub struct MerkleStage { + /// Flag if true would do `execute` but skip unwind but if it false it would skip execution but + /// do unwind. + pub is_execute: bool, +} + +#[async_trait::async_trait] +impl Stage for MerkleStage { + /// Return the id of the stage + fn id(&self) -> StageId { + if self.is_execute { + MERKLE_EXECUTION + } else { + MERKLE_UNWIND + } + } + + /// Execute the stage. + async fn execute( + &mut self, + _tx: &mut Transaction<'_, DB>, + input: ExecInput, + ) -> Result { + if !self.is_execute { + info!(target: "sync::stages::merkle::unwind", "Stage is always skipped"); + return Ok(ExecOutput { stage_progress: input.previous_stage_progress(), done: true }) + } + + // Iterate over changeset (similar to Hashing stages) and take new values + + info!(target: "sync::stages::merkle::exec", "Stage finished"); + Ok(ExecOutput { stage_progress: input.previous_stage_progress(), done: true }) + } + + /// Unwind the stage. + async fn unwind( + &mut self, + _tx: &mut Transaction<'_, DB>, + input: UnwindInput, + ) -> Result { + if self.is_execute { + info!(target: "sync::stages::merkle::exec", "Stage is always skipped"); + return Ok(UnwindOutput { stage_progress: input.unwind_to }) + } + + info!(target: "sync::stages::merkle::unwind", "Stage finished"); + Ok(UnwindOutput { stage_progress: input.unwind_to }) + } +} diff --git a/crates/stages/src/stages/mod.rs b/crates/stages/src/stages/mod.rs index 4bc1860a8d..88dd5e17a2 100644 --- a/crates/stages/src/stages/mod.rs +++ b/crates/stages/src/stages/mod.rs @@ -2,8 +2,14 @@ pub mod bodies; /// The execution stage that generates state diff. pub mod execution; +/// Account hashing stage. +pub mod hashing_account; +/// Storage hashing stage. +pub mod hashing_storage; /// The headers stage. pub mod headers; +/// Intermediate hashes and creating merkle root +pub mod merkle; /// The sender recovery stage. pub mod sender_recovery; /// The total difficulty stage diff --git a/crates/storage/db/src/tables/mod.rs b/crates/storage/db/src/tables/mod.rs index 69b4db2479..df1e9333d6 100644 --- a/crates/storage/db/src/tables/mod.rs +++ b/crates/storage/db/src/tables/mod.rs @@ -33,7 +33,7 @@ pub enum TableType { } /// Default tables that should be present inside database. -pub const TABLES: [(TableType, &str); 23] = [ +pub const TABLES: [(TableType, &str); 25] = [ (TableType::Table, CanonicalHeaders::const_name()), (TableType::Table, HeaderTD::const_name()), (TableType::Table, HeaderNumbers::const_name()), @@ -54,6 +54,8 @@ pub const TABLES: [(TableType, &str); 23] = [ (TableType::Table, StorageHistory::const_name()), (TableType::DupSort, AccountChangeSet::const_name()), (TableType::DupSort, StorageChangeSet::const_name()), + (TableType::Table, HashedAccount::const_name()), + (TableType::DupSort, HashedStorage::const_name()), (TableType::Table, TxSenders::const_name()), (TableType::Table, Config::const_name()), (TableType::Table, SyncStage::const_name()), @@ -163,11 +165,6 @@ table!( ( Logs ) TxNumber | Receipt ); -table!( - /// Stores the current state of an [`Account`]. - ( PlainAccountState ) Address | Account -); - table!( /// Stores all smart contract bytecodes. /// There will be multiple accounts that have same bytecode @@ -190,6 +187,11 @@ table!( ( TxTransitionIndex ) TxNumber | TransitionId ); +table!( + /// Stores the current state of an [`Account`]. + ( PlainAccountState ) Address | Account +); + dupsort!( /// Stores the current value of a storage key. ( PlainStorageState ) Address | [H256] StorageEntry @@ -260,6 +262,22 @@ dupsort!( ( StorageChangeSet ) TransitionIdAddress | [H256] StorageEntry ); +table!( + /// Stores the current state of an [`Account`] indexed with `keccak256(Address)` + /// This table is in preparation for merkelization and calculation of state root. + /// We are saving whole account data as it is needed for partial update when + /// part of storage is changed. Benefit for merkelization is that hashed addresses are sorted. + ( HashedAccount ) H256 | Account +); + +dupsort!( + /// Stores the current storage values indexed with `keccak256(Address)` and + /// hash of storage key `keccak256(key)`. + /// This table is in preparation for merkelization and calculation of state root. + /// Benefit for merklization is that hashed addresses/keys are sorted. + ( HashedStorage ) H256 | [H256] StorageEntry +); + table!( /// Stores the transaction sender for each transaction. /// It is needed to speed up execution stage and allows fetching signer without doing