diff --git a/Cargo.lock b/Cargo.lock index 9929ef06af..39b95fa1a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5558,7 +5558,6 @@ dependencies = [ "reth-revm-primitives", "reth-rlp", "reth-trie", - "thiserror", "tokio", "tokio-stream", "tracing", diff --git a/crates/interfaces/src/provider.rs b/crates/interfaces/src/provider.rs index 6ef7517d00..f9ed2a8dc4 100644 --- a/crates/interfaces/src/provider.rs +++ b/crates/interfaces/src/provider.rs @@ -4,6 +4,8 @@ use reth_primitives::{Address, BlockHash, BlockHashOrNumber, BlockNumber, TxNumb #[allow(missing_docs)] #[derive(Debug, thiserror::Error, PartialEq, Eq, Clone)] pub enum ProviderError { + #[error(transparent)] + Database(#[from] crate::db::DatabaseError), /// The header number was not found for the given block hash. #[error("Block hash {0:?} does not exist in Headers table")] BlockHashNotFound(BlockHash), @@ -68,4 +70,28 @@ pub enum ProviderError { /// Unable to find the block number for a given transaction index #[error("Unable to find the block number for a given transaction index")] BlockNumberForTransactionIndexNotFound, + /// Root mismatch + #[error("Merkle trie root mismatch at #{block_number} ({block_hash:?}). Got: {got:?}. Expected: {expected:?}")] + StateRootMismatch { + /// Expected root + expected: H256, + /// Calculated root + got: H256, + /// Block number + block_number: BlockNumber, + /// Block hash + block_hash: BlockHash, + }, + /// Root mismatch during unwind + #[error("Unwind merkle trie root mismatch at #{block_number} ({block_hash:?}). Got: {got:?}. Expected: {expected:?}")] + UnwindStateRootMismatch { + /// Expected root + expected: H256, + /// Calculated root + got: H256, + /// Target block number + block_number: BlockNumber, + /// Block hash + block_hash: BlockHash, + }, } diff --git a/crates/staged-sync/src/utils/init.rs b/crates/staged-sync/src/utils/init.rs index d04a11f0db..869de558b6 100644 --- a/crates/staged-sync/src/utils/init.rs +++ b/crates/staged-sync/src/utils/init.rs @@ -8,10 +8,8 @@ use reth_db::{ transaction::{DbTx, DbTxMut}, version::{check_db_version_file, create_db_version_file, DatabaseVersionError}, }; -use reth_primitives::{stage::StageId, Account, Bytecode, ChainSpec, H256, U256}; -use reth_provider::{ - AccountWriter, DatabaseProviderRW, PostState, ProviderFactory, TransactionError, -}; +use reth_primitives::{stage::StageId, Account, Bytecode, ChainSpec, StorageEntry, H256, U256}; +use reth_provider::{AccountWriter, DatabaseProviderRW, HashingWriter, PostState, ProviderFactory}; use std::{fs, path::Path, sync::Arc}; use tracing::debug; @@ -50,10 +48,6 @@ pub enum InitDatabaseError { database_hash: H256, }, - /// Higher level error encountered when using a Transaction. - #[error(transparent)] - TransactionError(#[from] TransactionError), - /// Low-level database error. #[error(transparent)] DBError(#[from] reth_db::DatabaseError), @@ -155,7 +149,12 @@ pub fn insert_genesis_hashes( let alloc_storage = genesis.alloc.clone().into_iter().filter_map(|(addr, account)| { // only return Some if there is storage - account.storage.map(|storage| (addr, storage.into_iter().map(|(k, v)| (k, v.into())))) + account.storage.map(|storage| { + ( + addr, + storage.into_iter().map(|(key, value)| StorageEntry { key, value: value.into() }), + ) + }) }); provider.insert_storage_for_hashing(alloc_storage)?; provider.commit()?; diff --git a/crates/stages/src/error.rs b/crates/stages/src/error.rs index b05b091db5..20310111ca 100644 --- a/crates/stages/src/error.rs +++ b/crates/stages/src/error.rs @@ -4,7 +4,6 @@ use reth_interfaces::{ provider::ProviderError, }; use reth_primitives::SealedHeader; -use reth_provider::TransactionError; use thiserror::Error; use tokio::sync::mpsc::error::SendError; @@ -59,9 +58,6 @@ pub enum StageError { /// The stage encountered a database integrity error. #[error("A database integrity error occurred: {0}")] DatabaseIntegrity(#[from] ProviderError), - /// The stage encountered an error related to the current database transaction. - #[error("A database transaction error occurred: {0}")] - Transaction(#[from] TransactionError), /// Invalid download response. Applicable for stages which /// rely on external downloaders #[error("Invalid download response: {0}")] @@ -92,8 +88,7 @@ impl StageError { StageError::DatabaseIntegrity(_) | StageError::StageCheckpoint(_) | StageError::ChannelClosed | - StageError::Fatal(_) | - StageError::Transaction(_) + StageError::Fatal(_) ) } } diff --git a/crates/stages/src/stages/hashing_storage.rs b/crates/stages/src/stages/hashing_storage.rs index 2c298de335..db4600a02b 100644 --- a/crates/stages/src/stages/hashing_storage.rs +++ b/crates/stages/src/stages/hashing_storage.rs @@ -16,7 +16,7 @@ use reth_primitives::{ }, StorageEntry, }; -use reth_provider::DatabaseProviderRW; +use reth_provider::{DatabaseProviderRW, HashingWriter, StorageReader}; use std::{collections::BTreeMap, fmt::Debug}; use tracing::*; @@ -171,12 +171,11 @@ impl Stage for StorageHashingStage { } else { // Aggregate all changesets and and make list of storages that have been // changed. - let lists = - provider.get_addresses_and_keys_of_changed_storages(from_block..=to_block)?; + let lists = provider.changed_storages_with_range(from_block..=to_block)?; // iterate over plain state and get newest storage value. // Assumption we are okay with is that plain state represent // `previous_stage_progress` state. - let storages = provider.get_plainstate_storages(lists)?; + let storages = provider.plainstate_storages(lists)?; provider.insert_storage_for_hashing(storages.into_iter())?; } diff --git a/crates/stages/src/stages/index_storage_history.rs b/crates/stages/src/stages/index_storage_history.rs index 1abb389b3f..31d033173c 100644 --- a/crates/stages/src/stages/index_storage_history.rs +++ b/crates/stages/src/stages/index_storage_history.rs @@ -1,7 +1,7 @@ use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use reth_db::{database::Database, models::BlockNumberAddress}; use reth_primitives::stage::{StageCheckpoint, StageId}; -use reth_provider::DatabaseProviderRW; +use reth_provider::{DatabaseProviderRW, HistoryWriter, StorageReader}; use std::fmt::Debug; /// Stage is indexing history the account changesets generated in @@ -46,7 +46,7 @@ impl Stage for IndexStorageHistoryStage { let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold); - let indices = provider.get_storage_block_numbers_from_changesets(range.clone())?; + let indices = provider.changed_storages_and_blocks_with_range(range.clone())?; provider.insert_storage_history_index(indices)?; Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: is_final_range }) diff --git a/crates/storage/provider/Cargo.toml b/crates/storage/provider/Cargo.toml index 22fcaba5e7..25b7e9f4c8 100644 --- a/crates/storage/provider/Cargo.toml +++ b/crates/storage/provider/Cargo.toml @@ -24,7 +24,6 @@ tokio-stream = { workspace = true, features = ["sync"] } tracing = { workspace = true } # misc -thiserror = { workspace = true } auto_impl = "1.0" itertools = "0.10" pin-project = { workspace = true } diff --git a/crates/storage/provider/src/lib.rs b/crates/storage/provider/src/lib.rs index 5a988cf88c..a0c2055684 100644 --- a/crates/storage/provider/src/lib.rs +++ b/crates/storage/provider/src/lib.rs @@ -19,9 +19,10 @@ pub use traits::{ BlockIdProvider, BlockNumProvider, BlockProvider, BlockProviderIdExt, BlockSource, BlockchainTreePendingStateProvider, CanonChainTracker, CanonStateNotification, CanonStateNotificationSender, CanonStateNotifications, CanonStateSubscriptions, EvmEnvProvider, - ExecutorFactory, HeaderProvider, PostStateDataProvider, ReceiptProvider, ReceiptProviderIdExt, - StageCheckpointReader, StageCheckpointWriter, StateProvider, StateProviderBox, - StateProviderFactory, StateRootProvider, TransactionsProvider, WithdrawalsProvider, + ExecutorFactory, HashingWriter, HeaderProvider, HistoryWriter, PostStateDataProvider, + ReceiptProvider, ReceiptProviderIdExt, StageCheckpointReader, StageCheckpointWriter, + StateProvider, StateProviderBox, StateProviderFactory, StateRootProvider, StorageReader, + TransactionsProvider, WithdrawalsProvider, }; /// Provider trait implementations. @@ -35,10 +36,6 @@ pub use providers::{ pub mod post_state; pub use post_state::PostState; -/// Helper types for interacting with the database -mod transaction; -pub use transaction::TransactionError; - /// Common database utilities. mod utils; pub use utils::{insert_block, insert_canonical_block}; diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index b9b6ea5203..86e7429fe7 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -3,8 +3,8 @@ use crate::{ post_state::StorageChangeset, traits::{AccountExtReader, BlockSource, ReceiptProvider, StageCheckpointWriter}, AccountReader, AccountWriter, BlockHashProvider, BlockNumProvider, BlockProvider, - EvmEnvProvider, HeaderProvider, PostState, ProviderError, StageCheckpointReader, - TransactionError, TransactionsProvider, WithdrawalsProvider, + EvmEnvProvider, HashingWriter, HeaderProvider, HistoryWriter, PostState, ProviderError, + StageCheckpointReader, StorageReader, TransactionsProvider, WithdrawalsProvider, }; use itertools::{izip, Itertools}; use reth_db::{ @@ -145,7 +145,7 @@ fn unwind_storage_history_shards<'a, TX: reth_db::transaction::DbTxMutGAT<'a>>( address: Address, storage_key: H256, block_number: BlockNumber, -) -> std::result::Result, TransactionError> { +) -> Result> { let mut item = cursor.seek_exact(StorageShardedKey::new(address, storage_key, u64::MAX))?; while let Some((storage_sharded_key, list)) = item { @@ -205,77 +205,6 @@ impl<'this, TX: DbTx<'this>> DatabaseProvider<'this, TX> { .walk(Some(T::Key::default()))? .collect::, DatabaseError>>() } - - // TODO(joshie) TEMPORARY should be moved to trait providers - - /// Iterate over account changesets and return all account address that were changed. - pub fn get_addresses_and_keys_of_changed_storages( - &self, - range: RangeInclusive, - ) -> std::result::Result>, TransactionError> { - self.tx - .cursor_read::()? - .walk_range(BlockNumberAddress::range(range))? - // fold all storages and save its old state so we can remove it from HashedStorage - // it is needed as it is dup table. - .try_fold(BTreeMap::new(), |mut accounts: BTreeMap>, entry| { - let (BlockNumberAddress((_, address)), storage_entry) = entry?; - accounts.entry(address).or_default().insert(storage_entry.key); - Ok(accounts) - }) - } - - /// Get plainstate storages - #[allow(clippy::type_complexity)] - pub fn get_plainstate_storages( - &self, - iter: impl IntoIterator)>, - ) -> std::result::Result)>, TransactionError> { - let mut plain_storage = self.tx.cursor_dup_read::()?; - - iter.into_iter() - .map(|(address, storage)| { - storage - .into_iter() - .map(|key| -> std::result::Result<_, TransactionError> { - let ret = plain_storage - .seek_by_key_subkey(address, key)? - .filter(|v| v.key == key) - .unwrap_or_default(); - Ok((key, ret.value)) - }) - .collect::, _>>() - .map(|storage| (address, storage)) - }) - .collect::, _>>() - } - - /// Get all block numbers where account got changed. - /// - /// NOTE: Get inclusive range of blocks. - pub fn get_storage_block_numbers_from_changesets( - &self, - range: RangeInclusive, - ) -> std::result::Result>, TransactionError> { - let mut changeset_cursor = self.tx.cursor_read::()?; - - let storage_changeset_lists = - changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold( - BTreeMap::new(), - |mut storages: BTreeMap<(Address, H256), Vec>, - entry| - -> std::result::Result<_, TransactionError> { - let (index, storage) = entry?; - storages - .entry((index.address(), storage.key)) - .or_default() - .push(index.block_number()); - Ok(storages) - }, - )?; - - Ok(storage_changeset_lists) - } } impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { @@ -291,7 +220,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { &self, chain_spec: &ChainSpec, range: RangeInclusive, - ) -> std::result::Result, TransactionError> { + ) -> Result> { self.get_take_block_and_execution_range::(chain_spec, range) } @@ -300,107 +229,10 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { &self, chain_spec: &ChainSpec, range: RangeInclusive, - ) -> std::result::Result, TransactionError> { + ) -> Result> { self.get_take_block_and_execution_range::(chain_spec, range) } - /// Unwind and clear storage hashing - pub fn unwind_storage_hashing( - &self, - range: Range, - ) -> std::result::Result<(), TransactionError> { - let mut hashed_storage = self.tx.cursor_dup_write::()?; - - // Aggregate all block changesets and make list of accounts that have been changed. - self.tx - .cursor_read::()? - .walk_range(range)? - .collect::, _>>()? - .into_iter() - .rev() - // fold all account to get the old balance/nonces and account that needs to be removed - .fold( - BTreeMap::new(), - |mut accounts: BTreeMap<(Address, H256), U256>, - (BlockNumberAddress((_, address)), storage_entry)| { - accounts.insert((address, storage_entry.key), storage_entry.value); - accounts - }, - ) - .into_iter() - // hash addresses and collect it inside sorted BTreeMap. - // We are doing keccak only once per address. - .map(|((address, key), value)| ((keccak256(address), keccak256(key)), value)) - .collect::>() - .into_iter() - // Apply values to HashedStorage (if Value is zero just remove it); - .try_for_each( - |((hashed_address, key), value)| -> std::result::Result<(), TransactionError> { - if hashed_storage - .seek_by_key_subkey(hashed_address, key)? - .filter(|entry| entry.key == key) - .is_some() - { - hashed_storage.delete_current()?; - } - - if value != U256::ZERO { - hashed_storage.upsert(hashed_address, StorageEntry { key, value })?; - } - Ok(()) - }, - )?; - - Ok(()) - } - - /// Unwind and clear storage history indices. - /// - /// Returns number of changesets walked. - pub fn unwind_storage_history_indices( - &self, - range: Range, - ) -> std::result::Result { - let storage_changesets = self - .tx - .cursor_read::()? - .walk_range(range)? - .collect::, _>>()?; - let changesets = storage_changesets.len(); - - let last_indices = storage_changesets - .into_iter() - // reverse so we can get lowest block number where we need to unwind account. - .rev() - // fold all storages and get last block number - .fold( - BTreeMap::new(), - |mut accounts: BTreeMap<(Address, H256), u64>, (index, storage)| { - // we just need address and lowest block number. - accounts.insert((index.address(), storage.key), index.block_number()); - accounts - }, - ); - - let mut cursor = self.tx.cursor_write::()?; - for ((address, storage_key), rem_index) in last_indices { - let shard_part = - unwind_storage_history_shards::(&mut cursor, address, storage_key, rem_index)?; - - // check last shard_part, if present, items needs to be reinserted. - if !shard_part.is_empty() { - // there are items in list - self.tx.put::( - StorageShardedKey::new(address, storage_key, u64::MAX), - BlockNumberList::new(shard_part) - .expect("There is at least one element in list and it is sorted."), - )?; - } - } - - Ok(changesets) - } - /// Traverse over changesets and plain state and recreate the [`PostState`]s for the given range /// of blocks. /// @@ -427,7 +259,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { fn get_take_block_execution_result_range( &self, range: RangeInclusive, - ) -> std::result::Result, TransactionError> { + ) -> Result> { if range.is_empty() { return Ok(Vec::new()) } @@ -587,7 +419,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { &self, chain_spec: &ChainSpec, range: RangeInclusive, - ) -> std::result::Result, TransactionError> { + ) -> Result> { if TAKE { let storage_range = BlockNumberAddress::range(range.clone()); @@ -598,7 +430,8 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { // merkle tree let (new_state_root, trie_updates) = - StateRoot::incremental_root_with_updates(&self.tx, range.clone())?; + StateRoot::incremental_root_with_updates(&self.tx, range.clone()) + .map_err(Into::::into)?; let parent_number = range.start().saturating_sub(1); let parent_state_root = self @@ -612,12 +445,13 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { let parent_hash = self .block_hash(parent_number)? .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?; - return Err(TransactionError::UnwindStateRootMismatch { + return Err(ProviderError::UnwindStateRootMismatch { got: new_state_root, expected: parent_state_root, block_number: parent_number, block_hash: parent_hash, - }) + } + .into()) } trie_updates.flush(&self.tx)?; } @@ -675,8 +509,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { fn get_take_block_transaction_range( &self, range: impl RangeBounds + Clone, - ) -> std::result::Result)>, TransactionError> - { + ) -> Result)>> { // Raad range of block bodies to get all transactions id's of this range. let block_bodies = self.get_or_take::(range)?; @@ -755,7 +588,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { &self, chain_spec: &ChainSpec, range: impl RangeBounds + Clone, - ) -> std::result::Result, TransactionError> { + ) -> Result> { // For block we need Headers, Bodies, Uncles, withdrawals, Transactions, Signers let block_headers = self.get_or_take::(range.clone())?; @@ -837,51 +670,8 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { Ok(blocks) } - /// Insert storage change index to database. Used inside StorageHistoryIndex stage - pub fn insert_storage_history_index( - &self, - storage_transitions: BTreeMap<(Address, H256), Vec>, - ) -> std::result::Result<(), TransactionError> { - for ((address, storage_key), mut indices) in storage_transitions { - let mut last_shard = self.take_last_storage_shard(address, storage_key)?; - last_shard.append(&mut indices); - - // chunk indices and insert them in shards of N size. - let mut chunks = last_shard - .iter() - .chunks(storage_sharded_key::NUM_OF_INDICES_IN_SHARD) - .into_iter() - .map(|chunks| chunks.map(|i| *i as usize).collect::>()) - .collect::>(); - let last_chunk = chunks.pop(); - - // chunk indices and insert them in shards of N size. - chunks.into_iter().try_for_each(|list| { - self.tx.put::( - StorageShardedKey::new( - address, - storage_key, - *list.last().expect("Chuck does not return empty list") as BlockNumber, - ), - BlockNumberList::new(list).expect("Indices are presorted and not empty"), - ) - })?; - // Insert last list with u64::MAX - if let Some(last_list) = last_chunk { - self.tx.put::( - StorageShardedKey::new(address, storage_key, u64::MAX), - BlockNumberList::new(last_list).expect("Indices are presorted and not empty"), - )?; - } - } - Ok(()) - } - /// Query the block body by number. - pub fn block_body_indices( - &self, - number: BlockNumber, - ) -> std::result::Result { + pub fn block_body_indices(&self, number: BlockNumber) -> Result { let body = self .tx .get::(number)? @@ -948,11 +738,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { /// Load last shard and check if it is full and remove if it is not. If list is empty, last /// shard was full or there is no shards at all. - pub fn take_last_storage_shard( - &self, - address: Address, - storage_key: H256, - ) -> std::result::Result, TransactionError> { + pub fn take_last_storage_shard(&self, address: Address, storage_key: H256) -> Result> { let mut cursor = self.tx.cursor_read::()?; let last = cursor.seek_exact(StorageShardedKey::new(address, storage_key, u64::MAX))?; if let Some((storage_shard_key, list)) = last { @@ -963,44 +749,6 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { } Ok(Vec::new()) } - /// iterate over storages and insert them to hashing table - pub fn insert_storage_for_hashing( - &self, - storages: impl IntoIterator)>, - ) -> std::result::Result<(), TransactionError> { - // hash values - let hashed = storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| { - let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, (key, value)| { - map.insert(keccak256(key), value); - map - }); - map.insert(keccak256(address), storage); - map - }); - - let mut hashed_storage = self.tx.cursor_dup_write::()?; - // Hash the address and key and apply them to HashedStorage (if Storage is None - // just remove it); - hashed.into_iter().try_for_each(|(hashed_address, storage)| { - storage.into_iter().try_for_each( - |(key, value)| -> std::result::Result<(), TransactionError> { - if hashed_storage - .seek_by_key_subkey(hashed_address, key)? - .filter(|entry| entry.key == key) - .is_some() - { - hashed_storage.delete_current()?; - } - - if value != U256::ZERO { - hashed_storage.upsert(hashed_address, StorageEntry { key, value })?; - } - Ok(()) - }, - ) - })?; - Ok(()) - } /// Append blocks and insert its post state. /// This will insert block data to all related tables and will update pipeline progress. @@ -1008,7 +756,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { &mut self, blocks: Vec, state: PostState, - ) -> std::result::Result<(), TransactionError> { + ) -> Result<()> { if blocks.is_empty() { return Ok(()) } @@ -1047,73 +795,10 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { &mut self, block: SealedBlock, senders: Option>, - ) -> std::result::Result<(), TransactionError> { + ) -> Result<()> { insert_canonical_block(self.tx_mut(), block, senders)?; Ok(()) } - - /// Read account/storage changesets and update account/storage history indices. - pub fn calculate_history_indices( - &mut self, - range: RangeInclusive, - ) -> std::result::Result<(), TransactionError> { - // account history stage - { - let indices = self.changed_accounts_and_blocks_with_range(range.clone())?; - self.insert_account_history_index(indices)?; - } - - // storage history stage - { - let indices = self.get_storage_block_numbers_from_changesets(range)?; - self.insert_storage_history_index(indices)?; - } - - Ok(()) - } - - /// Calculate the hashes of all changed accounts and storages, and finally calculate the state - /// root. - /// - /// The hashes are calculated from `fork_block_number + 1` to `current_block_number`. - /// - /// The resulting state root is compared with `expected_state_root`. - pub fn insert_hashes( - &mut self, - range: RangeInclusive, - end_block_hash: H256, - expected_state_root: H256, - ) -> std::result::Result<(), TransactionError> { - // storage hashing stage - { - let lists = self.get_addresses_and_keys_of_changed_storages(range.clone())?; - let storages = self.get_plainstate_storages(lists.into_iter())?; - self.insert_storage_for_hashing(storages.into_iter())?; - } - - // account hashing stage - { - let lists = self.changed_accounts_with_range(range.clone())?; - let accounts = self.basic_accounts(lists.into_iter())?; - self.insert_account_for_hashing(accounts.into_iter())?; - } - - // merkle tree - { - let (state_root, trie_updates) = - StateRoot::incremental_root_with_updates(&self.tx, range.clone())?; - if state_root != expected_state_root { - return Err(TransactionError::StateRootMismatch { - got: state_root, - expected: expected_state_root, - block_number: *range.end(), - block_hash: end_block_hash, - }) - } - trie_updates.flush(&self.tx)?; - } - Ok(()) - } } impl<'this, TX: DbTx<'this>> AccountReader for DatabaseProvider<'this, TX> { @@ -1816,3 +1501,282 @@ impl<'this, TX: DbTxMut<'this>> StageCheckpointWriter for DatabaseProvider<'this Ok(()) } } + +impl<'this, TX: DbTx<'this>> StorageReader for DatabaseProvider<'this, TX> { + fn plainstate_storages( + &self, + addresses_with_keys: impl IntoIterator)>, + ) -> Result)>> { + let mut plain_storage = self.tx.cursor_dup_read::()?; + + addresses_with_keys + .into_iter() + .map(|(address, storage)| { + storage + .into_iter() + .map(|key| -> Result<_> { + Ok(plain_storage + .seek_by_key_subkey(address, key)? + .filter(|v| v.key == key) + .unwrap_or_else(|| StorageEntry { key, value: Default::default() })) + }) + .collect::>>() + .map(|storage| (address, storage)) + }) + .collect::>>() + } + + fn changed_storages_with_range( + &self, + range: RangeInclusive, + ) -> Result>> { + self.tx + .cursor_read::()? + .walk_range(BlockNumberAddress::range(range))? + // fold all storages and save its old state so we can remove it from HashedStorage + // it is needed as it is dup table. + .try_fold(BTreeMap::new(), |mut accounts: BTreeMap>, entry| { + let (BlockNumberAddress((_, address)), storage_entry) = entry?; + accounts.entry(address).or_default().insert(storage_entry.key); + Ok(accounts) + }) + } + + fn changed_storages_and_blocks_with_range( + &self, + range: RangeInclusive, + ) -> Result>> { + let mut changeset_cursor = self.tx.cursor_read::()?; + + let storage_changeset_lists = + changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold( + BTreeMap::new(), + |mut storages: BTreeMap<(Address, H256), Vec>, entry| -> Result<_> { + let (index, storage) = entry?; + storages + .entry((index.address(), storage.key)) + .or_default() + .push(index.block_number()); + Ok(storages) + }, + )?; + + Ok(storage_changeset_lists) + } +} + +impl<'this, TX: DbTxMut<'this> + DbTx<'this>> HashingWriter for DatabaseProvider<'this, TX> { + fn insert_hashes( + &self, + range: RangeInclusive, + end_block_hash: H256, + expected_state_root: H256, + ) -> Result<()> { + // storage hashing stage + { + let lists = self.changed_storages_with_range(range.clone())?; + let storages = self.plainstate_storages(lists.into_iter())?; + self.insert_storage_for_hashing(storages.into_iter())?; + } + + // account hashing stage + { + let lists = self.changed_accounts_with_range(range.clone())?; + let accounts = self.basic_accounts(lists.into_iter())?; + self.insert_account_for_hashing(accounts.into_iter())?; + } + + // merkle tree + { + let (state_root, trie_updates) = + StateRoot::incremental_root_with_updates(&self.tx, range.clone()) + .map_err(Into::::into)?; + if state_root != expected_state_root { + return Err(ProviderError::StateRootMismatch { + got: state_root, + expected: expected_state_root, + block_number: *range.end(), + block_hash: end_block_hash, + } + .into()) + } + trie_updates.flush(&self.tx)?; + } + Ok(()) + } + + fn unwind_storage_hashing(&self, range: Range) -> Result<()> { + let mut hashed_storage = self.tx.cursor_dup_write::()?; + + // Aggregate all block changesets and make list of accounts that have been changed. + self.tx + .cursor_read::()? + .walk_range(range)? + .collect::, _>>()? + .into_iter() + .rev() + // fold all account to get the old balance/nonces and account that needs to be removed + .fold( + BTreeMap::new(), + |mut accounts: BTreeMap<(Address, H256), U256>, + (BlockNumberAddress((_, address)), storage_entry)| { + accounts.insert((address, storage_entry.key), storage_entry.value); + accounts + }, + ) + .into_iter() + // hash addresses and collect it inside sorted BTreeMap. + // We are doing keccak only once per address. + .map(|((address, key), value)| ((keccak256(address), keccak256(key)), value)) + .collect::>() + .into_iter() + // Apply values to HashedStorage (if Value is zero just remove it); + .try_for_each(|((hashed_address, key), value)| -> Result<()> { + if hashed_storage + .seek_by_key_subkey(hashed_address, key)? + .filter(|entry| entry.key == key) + .is_some() + { + hashed_storage.delete_current()?; + } + + if value != U256::ZERO { + hashed_storage.upsert(hashed_address, StorageEntry { key, value })?; + } + Ok(()) + })?; + + Ok(()) + } + fn insert_storage_for_hashing( + &self, + storages: impl IntoIterator)>, + ) -> Result<()> { + // hash values + let hashed = storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| { + let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| { + map.insert(keccak256(entry.key), entry.value); + map + }); + map.insert(keccak256(address), storage); + map + }); + + let mut hashed_storage = self.tx.cursor_dup_write::()?; + // Hash the address and key and apply them to HashedStorage (if Storage is None + // just remove it); + hashed.into_iter().try_for_each(|(hashed_address, storage)| { + storage.into_iter().try_for_each(|(key, value)| -> Result<()> { + if hashed_storage + .seek_by_key_subkey(hashed_address, key)? + .filter(|entry| entry.key == key) + .is_some() + { + hashed_storage.delete_current()?; + } + + if value != U256::ZERO { + hashed_storage.upsert(hashed_address, StorageEntry { key, value })?; + } + Ok(()) + }) + })?; + Ok(()) + } +} + +impl<'this, TX: DbTxMut<'this> + DbTx<'this>> HistoryWriter for DatabaseProvider<'this, TX> { + fn calculate_history_indices(&self, range: RangeInclusive) -> Result<()> { + // account history stage + { + let indices = self.changed_accounts_and_blocks_with_range(range.clone())?; + self.insert_account_history_index(indices)?; + } + + // storage history stage + { + let indices = self.changed_storages_and_blocks_with_range(range)?; + self.insert_storage_history_index(indices)?; + } + + Ok(()) + } + fn insert_storage_history_index( + &self, + storage_transitions: BTreeMap<(Address, H256), Vec>, + ) -> Result<()> { + for ((address, storage_key), mut indices) in storage_transitions { + let mut last_shard = self.take_last_storage_shard(address, storage_key)?; + last_shard.append(&mut indices); + + // chunk indices and insert them in shards of N size. + let mut chunks = last_shard + .iter() + .chunks(storage_sharded_key::NUM_OF_INDICES_IN_SHARD) + .into_iter() + .map(|chunks| chunks.map(|i| *i as usize).collect::>()) + .collect::>(); + let last_chunk = chunks.pop(); + + // chunk indices and insert them in shards of N size. + chunks.into_iter().try_for_each(|list| { + self.tx.put::( + StorageShardedKey::new( + address, + storage_key, + *list.last().expect("Chuck does not return empty list") as BlockNumber, + ), + BlockNumberList::new(list).expect("Indices are presorted and not empty"), + ) + })?; + // Insert last list with u64::MAX + if let Some(last_list) = last_chunk { + self.tx.put::( + StorageShardedKey::new(address, storage_key, u64::MAX), + BlockNumberList::new(last_list).expect("Indices are presorted and not empty"), + )?; + } + } + Ok(()) + } + fn unwind_storage_history_indices(&self, range: Range) -> Result { + let storage_changesets = self + .tx + .cursor_read::()? + .walk_range(range)? + .collect::, _>>()?; + let changesets = storage_changesets.len(); + + let last_indices = storage_changesets + .into_iter() + // reverse so we can get lowest block number where we need to unwind account. + .rev() + // fold all storages and get last block number + .fold( + BTreeMap::new(), + |mut accounts: BTreeMap<(Address, H256), u64>, (index, storage)| { + // we just need address and lowest block number. + accounts.insert((index.address(), storage.key), index.block_number()); + accounts + }, + ); + + let mut cursor = self.tx.cursor_write::()?; + for ((address, storage_key), rem_index) in last_indices { + let shard_part = + unwind_storage_history_shards::(&mut cursor, address, storage_key, rem_index)?; + + // check last shard_part, if present, items needs to be reinserted. + if !shard_part.is_empty() { + // there are items in list + self.tx.put::( + StorageShardedKey::new(address, storage_key, u64::MAX), + BlockNumberList::new(shard_part) + .expect("There is at least one element in list and it is sorted."), + )?; + } + } + + Ok(changesets) + } +} diff --git a/crates/storage/provider/src/traits/hashing.rs b/crates/storage/provider/src/traits/hashing.rs new file mode 100644 index 0000000000..1ebe58bec7 --- /dev/null +++ b/crates/storage/provider/src/traits/hashing.rs @@ -0,0 +1,31 @@ +use auto_impl::auto_impl; +use reth_db::models::BlockNumberAddress; +use reth_interfaces::Result; +use reth_primitives::{Address, BlockNumber, StorageEntry, H256}; +use std::ops::{Range, RangeInclusive}; + +/// Hashing Writer +#[auto_impl(&, Arc, Box)] +pub trait HashingWriter: Send + Sync { + /// Unwind and clear storage hashing + fn unwind_storage_hashing(&self, range: Range) -> Result<()>; + + /// iterate over storages and insert them to hashing table + fn insert_storage_for_hashing( + &self, + storages: impl IntoIterator)>, + ) -> Result<()>; + + /// Calculate the hashes of all changed accounts and storages, and finally calculate the state + /// root. + /// + /// The hashes are calculated from `fork_block_number + 1` to `current_block_number`. + /// + /// The resulting state root is compared with `expected_state_root`. + fn insert_hashes( + &self, + range: RangeInclusive, + end_block_hash: H256, + expected_state_root: H256, + ) -> Result<()>; +} diff --git a/crates/storage/provider/src/traits/history.rs b/crates/storage/provider/src/traits/history.rs new file mode 100644 index 0000000000..d79bdd1c3f --- /dev/null +++ b/crates/storage/provider/src/traits/history.rs @@ -0,0 +1,26 @@ +use auto_impl::auto_impl; +use reth_db::models::BlockNumberAddress; +use reth_interfaces::Result; +use reth_primitives::{Address, BlockNumber, H256}; +use std::{ + collections::BTreeMap, + ops::{Range, RangeInclusive}, +}; + +/// History Writer +#[auto_impl(&, Arc, Box)] +pub trait HistoryWriter: Send + Sync { + /// Unwind and clear storage history indices. + /// + /// Returns number of changesets walked. + fn unwind_storage_history_indices(&self, range: Range) -> Result; + + /// Insert storage change index to database. Used inside StorageHistoryIndex stage + fn insert_storage_history_index( + &self, + storage_transitions: BTreeMap<(Address, H256), Vec>, + ) -> Result<()>; + + /// Read account/storage changesets and update account/storage history indices. + fn calculate_history_indices(&self, range: RangeInclusive) -> Result<()>; +} diff --git a/crates/storage/provider/src/traits/mod.rs b/crates/storage/provider/src/traits/mod.rs index 07b918a01a..c4805a9cd6 100644 --- a/crates/storage/provider/src/traits/mod.rs +++ b/crates/storage/provider/src/traits/mod.rs @@ -3,6 +3,9 @@ mod account; pub use account::{AccountExtReader, AccountReader, AccountWriter}; +mod storage; +pub use storage::StorageReader; + mod block; pub use block::{BlockProvider, BlockProviderIdExt, BlockSource}; @@ -47,3 +50,9 @@ pub use chain::{ mod stage_checkpoint; pub use stage_checkpoint::{StageCheckpointReader, StageCheckpointWriter}; + +mod hashing; +pub use hashing::HashingWriter; + +mod history; +pub use history::HistoryWriter; diff --git a/crates/storage/provider/src/traits/storage.rs b/crates/storage/provider/src/traits/storage.rs new file mode 100644 index 0000000000..87782ba9bf --- /dev/null +++ b/crates/storage/provider/src/traits/storage.rs @@ -0,0 +1,33 @@ +use std::{ + collections::{BTreeMap, BTreeSet}, + ops::RangeInclusive, +}; + +use auto_impl::auto_impl; +use reth_interfaces::Result; +use reth_primitives::{Address, BlockNumber, StorageEntry, H256}; + +/// Storage reader +#[auto_impl(&, Arc, Box)] +pub trait StorageReader: Send + Sync { + /// Get plainstate storages for addresses and storage keys. + fn plainstate_storages( + &self, + addresses_with_keys: impl IntoIterator)>, + ) -> Result)>>; + + /// Iterate over storage changesets and return all storage slots that were changed. + fn changed_storages_with_range( + &self, + range: RangeInclusive, + ) -> Result>>; + + /// Iterate over storage changesets and return all storage slots that were changed alongside + /// each specific set of blocks. + /// + /// NOTE: Get inclusive range of blocks. + fn changed_storages_and_blocks_with_range( + &self, + range: RangeInclusive, + ) -> Result>>; +} diff --git a/crates/storage/provider/src/transaction.rs b/crates/storage/provider/src/transaction.rs index 9614f5e1f9..c64ae629f3 100644 --- a/crates/storage/provider/src/transaction.rs +++ b/crates/storage/provider/src/transaction.rs @@ -3,47 +3,6 @@ use reth_primitives::{BlockHash, BlockNumber, H256}; use reth_trie::StateRootError; use std::fmt::Debug; -/// An error that can occur when using the transaction container -#[derive(Debug, PartialEq, Eq, Clone, thiserror::Error)] -pub enum TransactionError { - /// The transaction encountered a database error. - #[error(transparent)] - Database(#[from] DbError), - /// The transaction encountered a database integrity error. - #[error(transparent)] - DatabaseIntegrity(#[from] ProviderError), - /// The trie error. - #[error(transparent)] - TrieError(#[from] StateRootError), - /// Root mismatch - #[error("Merkle trie root mismatch at #{block_number} ({block_hash:?}). Got: {got:?}. Expected: {expected:?}")] - StateRootMismatch { - /// Expected root - expected: H256, - /// Calculated root - got: H256, - /// Block number - block_number: BlockNumber, - /// Block hash - block_hash: BlockHash, - }, - /// Root mismatch during unwind - #[error("Unwind merkle trie root mismatch at #{block_number} ({block_hash:?}). Got: {got:?}. Expected: {expected:?}")] - UnwindStateRootMismatch { - /// Expected root - expected: H256, - /// Calculated root - got: H256, - /// Target block number - block_number: BlockNumber, - /// Block hash - block_hash: BlockHash, - }, - /// Internal interfaces error - #[error("Internal error")] - InternalError(#[from] reth_interfaces::Error), -} - #[cfg(test)] mod test { use crate::{ diff --git a/crates/transaction-pool/src/pool/txpool.rs b/crates/transaction-pool/src/pool/txpool.rs index e9d370c0ab..d32fe02e27 100644 --- a/crates/transaction-pool/src/pool/txpool.rs +++ b/crates/transaction-pool/src/pool/txpool.rs @@ -1443,11 +1443,10 @@ mod tests { let mut pool = AllTransactions::default(); let tx = MockTransaction::eip1559().inc_price().inc_limit(); let first = f.validated(tx.clone()); - let _res = pool.insert_tx(first.clone(), on_chain_balance, on_chain_nonce); + let _res = pool.insert_tx(first, on_chain_balance, on_chain_nonce); let mut replacement = f.validated(tx.rng_hash()); replacement.transaction = replacement.transaction.decr_price(); - let err = - pool.insert_tx(replacement.clone(), on_chain_balance, on_chain_nonce).unwrap_err(); + let err = pool.insert_tx(replacement, on_chain_balance, on_chain_nonce).unwrap_err(); assert!(matches!(err, InsertErr::Underpriced { .. })); }