From 3e07a5d508bce530b8aa3ecb35f07a1306676234 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Fri, 23 Jun 2023 11:55:07 +0100 Subject: [PATCH] chore: move `AccountWriter` methods to `HashingWriter` and `HistoryWriter` (#3332) Co-authored-by: Roman Krasiuk --- crates/staged-sync/src/utils/init.rs | 4 +- crates/stages/src/stages/hashing_account.rs | 2 +- .../src/stages/index_account_history.rs | 2 +- crates/storage/provider/src/lib.rs | 4 +- .../src/providers/database/provider.rs | 313 +++++++++--------- crates/storage/provider/src/traits/account.rs | 24 -- crates/storage/provider/src/traits/hashing.rs | 11 +- crates/storage/provider/src/traits/history.rs | 11 + crates/storage/provider/src/traits/mod.rs | 2 +- 9 files changed, 184 insertions(+), 189 deletions(-) diff --git a/crates/staged-sync/src/utils/init.rs b/crates/staged-sync/src/utils/init.rs index e146aab745..aadf2d6d90 100644 --- a/crates/staged-sync/src/utils/init.rs +++ b/crates/staged-sync/src/utils/init.rs @@ -9,9 +9,7 @@ use reth_db::{ version::{check_db_version_file, create_db_version_file, DatabaseVersionError}, }; use reth_primitives::{stage::StageId, Account, Bytecode, ChainSpec, StorageEntry, H256, U256}; -use reth_provider::{ - AccountWriter, DatabaseProviderRW, HashingWriter, HistoryWriter, PostState, ProviderFactory, -}; +use reth_provider::{DatabaseProviderRW, HashingWriter, HistoryWriter, PostState, ProviderFactory}; use std::{collections::BTreeMap, fs, path::Path, sync::Arc}; use tracing::debug; diff --git a/crates/stages/src/stages/hashing_account.rs b/crates/stages/src/stages/hashing_account.rs index 84485c7454..79b93cb2da 100644 --- a/crates/stages/src/stages/hashing_account.rs +++ b/crates/stages/src/stages/hashing_account.rs @@ -16,7 +16,7 @@ use reth_primitives::{ StageId, }, }; -use reth_provider::{AccountExtReader, AccountWriter, DatabaseProviderRW}; +use reth_provider::{AccountExtReader, DatabaseProviderRW, HashingWriter}; use std::{ cmp::max, fmt::Debug, diff --git a/crates/stages/src/stages/index_account_history.rs b/crates/stages/src/stages/index_account_history.rs index 30bc39aec8..34848e71f0 100644 --- a/crates/stages/src/stages/index_account_history.rs +++ b/crates/stages/src/stages/index_account_history.rs @@ -1,7 +1,7 @@ use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use reth_db::database::Database; use reth_primitives::stage::{StageCheckpoint, StageId}; -use reth_provider::{AccountExtReader, AccountWriter, DatabaseProviderRW}; +use reth_provider::{AccountExtReader, DatabaseProviderRW, HistoryWriter}; use std::fmt::Debug; /// Stage is indexing history the account changesets generated in diff --git a/crates/storage/provider/src/lib.rs b/crates/storage/provider/src/lib.rs index a0c2055684..4e6485fb2a 100644 --- a/crates/storage/provider/src/lib.rs +++ b/crates/storage/provider/src/lib.rs @@ -15,8 +15,8 @@ /// Various provider traits. mod traits; pub use traits::{ - AccountExtReader, AccountReader, AccountWriter, BlockExecutor, BlockHashProvider, - BlockIdProvider, BlockNumProvider, BlockProvider, BlockProviderIdExt, BlockSource, + AccountExtReader, AccountReader, BlockExecutor, BlockHashProvider, BlockIdProvider, + BlockNumProvider, BlockProvider, BlockProviderIdExt, BlockSource, BlockchainTreePendingStateProvider, CanonChainTracker, CanonStateNotification, CanonStateNotificationSender, CanonStateNotifications, CanonStateSubscriptions, EvmEnvProvider, ExecutorFactory, HashingWriter, HeaderProvider, HistoryWriter, PostStateDataProvider, diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 86e7429fe7..766975fbae 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -2,9 +2,9 @@ use crate::{ insert_canonical_block, post_state::StorageChangeset, traits::{AccountExtReader, BlockSource, ReceiptProvider, StageCheckpointWriter}, - AccountReader, AccountWriter, BlockHashProvider, BlockNumProvider, BlockProvider, - EvmEnvProvider, HashingWriter, HeaderProvider, HistoryWriter, PostState, ProviderError, - StageCheckpointReader, StorageReader, TransactionsProvider, WithdrawalsProvider, + AccountReader, BlockHashProvider, BlockNumProvider, BlockProvider, EvmEnvProvider, + HashingWriter, HeaderProvider, HistoryWriter, PostState, ProviderError, StageCheckpointReader, + StorageReader, TransactionsProvider, WithdrawalsProvider, }; use itertools::{izip, Itertools}; use reth_db::{ @@ -851,159 +851,6 @@ impl<'this, TX: DbTx<'this>> AccountExtReader for DatabaseProvider<'this, TX> { } } -impl<'this, TX: DbTxMut<'this> + DbTx<'this>> AccountWriter for DatabaseProvider<'this, TX> { - fn unwind_account_hashing(&self, range: RangeInclusive) -> Result<()> { - let mut hashed_accounts = self.tx.cursor_write::()?; - - // Aggregate all block changesets and make a list of accounts that have been changed. - self.tx - .cursor_read::()? - .walk_range(range)? - .collect::, _>>()? - .into_iter() - .rev() - // fold all account to get the old balance/nonces and account that needs to be removed - .fold( - BTreeMap::new(), - |mut accounts: BTreeMap>, (_, account_before)| { - accounts.insert(account_before.address, account_before.info); - accounts - }, - ) - .into_iter() - // hash addresses and collect it inside sorted BTreeMap. - // We are doing keccak only once per address. - .map(|(address, account)| (keccak256(address), account)) - .collect::>() - .into_iter() - // Apply values to HashedState (if Account is None remove it); - .try_for_each(|(hashed_address, account)| -> Result<()> { - if let Some(account) = account { - hashed_accounts.upsert(hashed_address, account)?; - } else if hashed_accounts.seek_exact(hashed_address)?.is_some() { - hashed_accounts.delete_current()?; - } - Ok(()) - })?; - - Ok(()) - } - - fn unwind_account_history_indices(&self, range: RangeInclusive) -> Result { - let account_changeset = self - .tx - .cursor_read::()? - .walk_range(range)? - .collect::, _>>()?; - let changesets = account_changeset.len(); - - let last_indices = account_changeset - .into_iter() - // reverse so we can get lowest block number where we need to unwind account. - .rev() - // fold all account and get last block number - .fold(BTreeMap::new(), |mut accounts: BTreeMap, (index, account)| { - // we just need address and lowest block number. - accounts.insert(account.address, index); - accounts - }); - // try to unwind the index - let mut cursor = self.tx.cursor_write::()?; - for (address, rem_index) in last_indices { - let shard_part = unwind_account_history_shards::(&mut cursor, address, rem_index)?; - - // check last shard_part, if present, items needs to be reinserted. - if !shard_part.is_empty() { - // there are items in list - self.tx.put::( - ShardedKey::new(address, u64::MAX), - BlockNumberList::new(shard_part) - .expect("There is at least one element in list and it is sorted."), - )?; - } - } - - Ok(changesets) - } - - fn insert_account_history_index( - &self, - account_transitions: BTreeMap>, - ) -> Result<()> { - // insert indexes to AccountHistory. - for (address, mut indices) in account_transitions { - // Load last shard and check if it is full and remove if it is not. If list is empty, - // last shard was full or there is no shards at all. - let mut last_shard = { - let mut cursor = self.tx.cursor_read::()?; - let last = cursor.seek_exact(ShardedKey::new(address, u64::MAX))?; - if let Some((shard_key, list)) = last { - // delete old shard so new one can be inserted. - self.tx.delete::(shard_key, None)?; - let list = list.iter(0).map(|i| i as u64).collect::>(); - list - } else { - Vec::new() - } - }; - - last_shard.append(&mut indices); - // chunk indices and insert them in shards of N size. - let mut chunks = last_shard - .iter() - .chunks(sharded_key::NUM_OF_INDICES_IN_SHARD) - .into_iter() - .map(|chunks| chunks.map(|i| *i as usize).collect::>()) - .collect::>(); - let last_chunk = chunks.pop(); - - chunks.into_iter().try_for_each(|list| { - self.tx.put::( - ShardedKey::new( - address, - *list.last().expect("Chuck does not return empty list") as BlockNumber, - ), - BlockNumberList::new(list).expect("Indices are presorted and not empty"), - ) - })?; - - // Insert last list with u64::MAX - if let Some(last_list) = last_chunk { - self.tx.put::( - ShardedKey::new(address, u64::MAX), - BlockNumberList::new(last_list).expect("Indices are presorted and not empty"), - )? - } - } - Ok(()) - } - - fn insert_account_for_hashing( - &self, - accounts: impl IntoIterator)>, - ) -> Result<()> { - let mut hashed_accounts = self.tx.cursor_write::()?; - - let hashes_accounts = accounts.into_iter().fold( - BTreeMap::new(), - |mut map: BTreeMap>, (address, account)| { - map.insert(keccak256(address), account); - map - }, - ); - - hashes_accounts.into_iter().try_for_each(|(hashed_address, account)| -> Result<()> { - if let Some(account) = account { - hashed_accounts.upsert(hashed_address, account)? - } else if hashed_accounts.seek_exact(hashed_address)?.is_some() { - hashed_accounts.delete_current()?; - } - Ok(()) - })?; - Ok(()) - } -} - impl<'this, TX: DbTx<'this>> HeaderProvider for DatabaseProvider<'this, TX> { fn header(&self, block_hash: &BlockHash) -> Result> { if let Some(num) = self.block_number(*block_hash)? { @@ -1648,6 +1495,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> HashingWriter for DatabaseProvider Ok(()) } + fn insert_storage_for_hashing( &self, storages: impl IntoIterator)>, @@ -1683,6 +1531,68 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> HashingWriter for DatabaseProvider })?; Ok(()) } + + fn unwind_account_hashing(&self, range: RangeInclusive) -> Result<()> { + let mut hashed_accounts = self.tx.cursor_write::()?; + + // Aggregate all block changesets and make a list of accounts that have been changed. + self.tx + .cursor_read::()? + .walk_range(range)? + .collect::, _>>()? + .into_iter() + .rev() + // fold all account to get the old balance/nonces and account that needs to be removed + .fold( + BTreeMap::new(), + |mut accounts: BTreeMap>, (_, account_before)| { + accounts.insert(account_before.address, account_before.info); + accounts + }, + ) + .into_iter() + // hash addresses and collect it inside sorted BTreeMap. + // We are doing keccak only once per address. + .map(|(address, account)| (keccak256(address), account)) + .collect::>() + .into_iter() + // Apply values to HashedState (if Account is None remove it); + .try_for_each(|(hashed_address, account)| -> Result<()> { + if let Some(account) = account { + hashed_accounts.upsert(hashed_address, account)?; + } else if hashed_accounts.seek_exact(hashed_address)?.is_some() { + hashed_accounts.delete_current()?; + } + Ok(()) + })?; + + Ok(()) + } + + fn insert_account_for_hashing( + &self, + accounts: impl IntoIterator)>, + ) -> Result<()> { + let mut hashed_accounts = self.tx.cursor_write::()?; + + let hashes_accounts = accounts.into_iter().fold( + BTreeMap::new(), + |mut map: BTreeMap>, (address, account)| { + map.insert(keccak256(address), account); + map + }, + ); + + hashes_accounts.into_iter().try_for_each(|(hashed_address, account)| -> Result<()> { + if let Some(account) = account { + hashed_accounts.upsert(hashed_address, account)? + } else if hashed_accounts.seek_exact(hashed_address)?.is_some() { + hashed_accounts.delete_current()?; + } + Ok(()) + })?; + Ok(()) + } } impl<'this, TX: DbTxMut<'this> + DbTx<'this>> HistoryWriter for DatabaseProvider<'this, TX> { @@ -1701,6 +1611,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> HistoryWriter for DatabaseProvider Ok(()) } + fn insert_storage_history_index( &self, storage_transitions: BTreeMap<(Address, H256), Vec>, @@ -1739,6 +1650,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> HistoryWriter for DatabaseProvider } Ok(()) } + fn unwind_storage_history_indices(&self, range: Range) -> Result { let storage_changesets = self .tx @@ -1779,4 +1691,93 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> HistoryWriter for DatabaseProvider Ok(changesets) } + + fn insert_account_history_index( + &self, + account_transitions: BTreeMap>, + ) -> Result<()> { + // insert indexes to AccountHistory. + for (address, mut indices) in account_transitions { + // Load last shard and check if it is full and remove if it is not. If list is empty, + // last shard was full or there is no shards at all. + let mut last_shard = { + let mut cursor = self.tx.cursor_read::()?; + let last = cursor.seek_exact(ShardedKey::new(address, u64::MAX))?; + if let Some((shard_key, list)) = last { + // delete old shard so new one can be inserted. + self.tx.delete::(shard_key, None)?; + let list = list.iter(0).map(|i| i as u64).collect::>(); + list + } else { + Vec::new() + } + }; + + last_shard.append(&mut indices); + // chunk indices and insert them in shards of N size. + let mut chunks = last_shard + .iter() + .chunks(sharded_key::NUM_OF_INDICES_IN_SHARD) + .into_iter() + .map(|chunks| chunks.map(|i| *i as usize).collect::>()) + .collect::>(); + let last_chunk = chunks.pop(); + + chunks.into_iter().try_for_each(|list| { + self.tx.put::( + ShardedKey::new( + address, + *list.last().expect("Chuck does not return empty list") as BlockNumber, + ), + BlockNumberList::new(list).expect("Indices are presorted and not empty"), + ) + })?; + + // Insert last list with u64::MAX + if let Some(last_list) = last_chunk { + self.tx.put::( + ShardedKey::new(address, u64::MAX), + BlockNumberList::new(last_list).expect("Indices are presorted and not empty"), + )? + } + } + Ok(()) + } + + fn unwind_account_history_indices(&self, range: RangeInclusive) -> Result { + let account_changeset = self + .tx + .cursor_read::()? + .walk_range(range)? + .collect::, _>>()?; + let changesets = account_changeset.len(); + + let last_indices = account_changeset + .into_iter() + // reverse so we can get lowest block number where we need to unwind account. + .rev() + // fold all account and get last block number + .fold(BTreeMap::new(), |mut accounts: BTreeMap, (index, account)| { + // we just need address and lowest block number. + accounts.insert(account.address, index); + accounts + }); + // try to unwind the index + let mut cursor = self.tx.cursor_write::()?; + for (address, rem_index) in last_indices { + let shard_part = unwind_account_history_shards::(&mut cursor, address, rem_index)?; + + // check last shard_part, if present, items needs to be reinserted. + if !shard_part.is_empty() { + // there are items in list + self.tx.put::( + ShardedKey::new(address, u64::MAX), + BlockNumberList::new(shard_part) + .expect("There is at least one element in list and it is sorted."), + )?; + } + } + + Ok(changesets) + } } diff --git a/crates/storage/provider/src/traits/account.rs b/crates/storage/provider/src/traits/account.rs index ceafaec245..d08d15a121 100644 --- a/crates/storage/provider/src/traits/account.rs +++ b/crates/storage/provider/src/traits/account.rs @@ -42,27 +42,3 @@ pub trait AccountExtReader: Send + Sync { range: RangeInclusive, ) -> Result>>; } - -/// Account reader -#[auto_impl(&, Arc, Box)] -pub trait AccountWriter: Send + Sync { - /// Unwind and clear account hashing - fn unwind_account_hashing(&self, range: RangeInclusive) -> Result<()>; - - /// Unwind and clear account history indices. - /// - /// Returns number of changesets walked. - fn unwind_account_history_indices(&self, range: RangeInclusive) -> Result; - - /// Insert account change index to database. Used inside AccountHistoryIndex stage - fn insert_account_history_index( - &self, - account_transitions: BTreeMap>, - ) -> Result<()>; - - /// iterate over accounts and insert them to hashing table - fn insert_account_for_hashing( - &self, - accounts: impl IntoIterator)>, - ) -> Result<()>; -} diff --git a/crates/storage/provider/src/traits/hashing.rs b/crates/storage/provider/src/traits/hashing.rs index 1ebe58bec7..dc327ca09c 100644 --- a/crates/storage/provider/src/traits/hashing.rs +++ b/crates/storage/provider/src/traits/hashing.rs @@ -1,12 +1,21 @@ use auto_impl::auto_impl; use reth_db::models::BlockNumberAddress; use reth_interfaces::Result; -use reth_primitives::{Address, BlockNumber, StorageEntry, H256}; +use reth_primitives::{Account, Address, BlockNumber, StorageEntry, H256}; use std::ops::{Range, RangeInclusive}; /// Hashing Writer #[auto_impl(&, Arc, Box)] pub trait HashingWriter: Send + Sync { + /// Unwind and clear account hashing + fn unwind_account_hashing(&self, range: RangeInclusive) -> Result<()>; + + /// Inserts all accounts into [reth_db::tables::AccountHistory] table. + fn insert_account_for_hashing( + &self, + accounts: impl IntoIterator)>, + ) -> Result<()>; + /// Unwind and clear storage hashing fn unwind_storage_hashing(&self, range: Range) -> Result<()>; diff --git a/crates/storage/provider/src/traits/history.rs b/crates/storage/provider/src/traits/history.rs index d79bdd1c3f..af391c4a05 100644 --- a/crates/storage/provider/src/traits/history.rs +++ b/crates/storage/provider/src/traits/history.rs @@ -10,6 +10,17 @@ use std::{ /// History Writer #[auto_impl(&, Arc, Box)] pub trait HistoryWriter: Send + Sync { + /// Unwind and clear account history indices. + /// + /// Returns number of changesets walked. + fn unwind_account_history_indices(&self, range: RangeInclusive) -> Result; + + /// Insert account change index to database. Used inside AccountHistoryIndex stage + fn insert_account_history_index( + &self, + account_transitions: BTreeMap>, + ) -> Result<()>; + /// Unwind and clear storage history indices. /// /// Returns number of changesets walked. diff --git a/crates/storage/provider/src/traits/mod.rs b/crates/storage/provider/src/traits/mod.rs index c4805a9cd6..81ad694a57 100644 --- a/crates/storage/provider/src/traits/mod.rs +++ b/crates/storage/provider/src/traits/mod.rs @@ -1,7 +1,7 @@ //! Collection of common provider traits. mod account; -pub use account::{AccountExtReader, AccountReader, AccountWriter}; +pub use account::{AccountExtReader, AccountReader}; mod storage; pub use storage::StorageReader;