refactor: adds StorageReader / HashingWriter / HistoryWriter (#3285)

This commit is contained in:
joshieDo
2023-06-22 15:08:27 +01:00
committed by GitHub
parent a9147ba2fc
commit 68b93a88de
15 changed files with 442 additions and 407 deletions

1
Cargo.lock generated
View File

@@ -5558,7 +5558,6 @@ dependencies = [
"reth-revm-primitives",
"reth-rlp",
"reth-trie",
"thiserror",
"tokio",
"tokio-stream",
"tracing",

View File

@@ -4,6 +4,8 @@ use reth_primitives::{Address, BlockHash, BlockHashOrNumber, BlockNumber, TxNumb
#[allow(missing_docs)]
#[derive(Debug, thiserror::Error, PartialEq, Eq, Clone)]
pub enum ProviderError {
#[error(transparent)]
Database(#[from] crate::db::DatabaseError),
/// The header number was not found for the given block hash.
#[error("Block hash {0:?} does not exist in Headers table")]
BlockHashNotFound(BlockHash),
@@ -68,4 +70,28 @@ pub enum ProviderError {
/// Unable to find the block number for a given transaction index
#[error("Unable to find the block number for a given transaction index")]
BlockNumberForTransactionIndexNotFound,
/// Root mismatch
#[error("Merkle trie root mismatch at #{block_number} ({block_hash:?}). Got: {got:?}. Expected: {expected:?}")]
StateRootMismatch {
/// Expected root
expected: H256,
/// Calculated root
got: H256,
/// Block number
block_number: BlockNumber,
/// Block hash
block_hash: BlockHash,
},
/// Root mismatch during unwind
#[error("Unwind merkle trie root mismatch at #{block_number} ({block_hash:?}). Got: {got:?}. Expected: {expected:?}")]
UnwindStateRootMismatch {
/// Expected root
expected: H256,
/// Calculated root
got: H256,
/// Target block number
block_number: BlockNumber,
/// Block hash
block_hash: BlockHash,
},
}

View File

@@ -8,10 +8,8 @@ use reth_db::{
transaction::{DbTx, DbTxMut},
version::{check_db_version_file, create_db_version_file, DatabaseVersionError},
};
use reth_primitives::{stage::StageId, Account, Bytecode, ChainSpec, H256, U256};
use reth_provider::{
AccountWriter, DatabaseProviderRW, PostState, ProviderFactory, TransactionError,
};
use reth_primitives::{stage::StageId, Account, Bytecode, ChainSpec, StorageEntry, H256, U256};
use reth_provider::{AccountWriter, DatabaseProviderRW, HashingWriter, PostState, ProviderFactory};
use std::{fs, path::Path, sync::Arc};
use tracing::debug;
@@ -50,10 +48,6 @@ pub enum InitDatabaseError {
database_hash: H256,
},
/// Higher level error encountered when using a Transaction.
#[error(transparent)]
TransactionError(#[from] TransactionError),
/// Low-level database error.
#[error(transparent)]
DBError(#[from] reth_db::DatabaseError),
@@ -155,7 +149,12 @@ pub fn insert_genesis_hashes<DB: Database>(
let alloc_storage = genesis.alloc.clone().into_iter().filter_map(|(addr, account)| {
// only return Some if there is storage
account.storage.map(|storage| (addr, storage.into_iter().map(|(k, v)| (k, v.into()))))
account.storage.map(|storage| {
(
addr,
storage.into_iter().map(|(key, value)| StorageEntry { key, value: value.into() }),
)
})
});
provider.insert_storage_for_hashing(alloc_storage)?;
provider.commit()?;

View File

@@ -4,7 +4,6 @@ use reth_interfaces::{
provider::ProviderError,
};
use reth_primitives::SealedHeader;
use reth_provider::TransactionError;
use thiserror::Error;
use tokio::sync::mpsc::error::SendError;
@@ -59,9 +58,6 @@ pub enum StageError {
/// The stage encountered a database integrity error.
#[error("A database integrity error occurred: {0}")]
DatabaseIntegrity(#[from] ProviderError),
/// The stage encountered an error related to the current database transaction.
#[error("A database transaction error occurred: {0}")]
Transaction(#[from] TransactionError),
/// Invalid download response. Applicable for stages which
/// rely on external downloaders
#[error("Invalid download response: {0}")]
@@ -92,8 +88,7 @@ impl StageError {
StageError::DatabaseIntegrity(_) |
StageError::StageCheckpoint(_) |
StageError::ChannelClosed |
StageError::Fatal(_) |
StageError::Transaction(_)
StageError::Fatal(_)
)
}
}

View File

@@ -16,7 +16,7 @@ use reth_primitives::{
},
StorageEntry,
};
use reth_provider::DatabaseProviderRW;
use reth_provider::{DatabaseProviderRW, HashingWriter, StorageReader};
use std::{collections::BTreeMap, fmt::Debug};
use tracing::*;
@@ -171,12 +171,11 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
} else {
// Aggregate all changesets and and make list of storages that have been
// changed.
let lists =
provider.get_addresses_and_keys_of_changed_storages(from_block..=to_block)?;
let lists = provider.changed_storages_with_range(from_block..=to_block)?;
// iterate over plain state and get newest storage value.
// Assumption we are okay with is that plain state represent
// `previous_stage_progress` state.
let storages = provider.get_plainstate_storages(lists)?;
let storages = provider.plainstate_storages(lists)?;
provider.insert_storage_for_hashing(storages.into_iter())?;
}

View File

@@ -1,7 +1,7 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use reth_db::{database::Database, models::BlockNumberAddress};
use reth_primitives::stage::{StageCheckpoint, StageId};
use reth_provider::DatabaseProviderRW;
use reth_provider::{DatabaseProviderRW, HistoryWriter, StorageReader};
use std::fmt::Debug;
/// Stage is indexing history the account changesets generated in
@@ -46,7 +46,7 @@ impl<DB: Database> Stage<DB> for IndexStorageHistoryStage {
let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold);
let indices = provider.get_storage_block_numbers_from_changesets(range.clone())?;
let indices = provider.changed_storages_and_blocks_with_range(range.clone())?;
provider.insert_storage_history_index(indices)?;
Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: is_final_range })

View File

@@ -24,7 +24,6 @@ tokio-stream = { workspace = true, features = ["sync"] }
tracing = { workspace = true }
# misc
thiserror = { workspace = true }
auto_impl = "1.0"
itertools = "0.10"
pin-project = { workspace = true }

View File

@@ -19,9 +19,10 @@ pub use traits::{
BlockIdProvider, BlockNumProvider, BlockProvider, BlockProviderIdExt, BlockSource,
BlockchainTreePendingStateProvider, CanonChainTracker, CanonStateNotification,
CanonStateNotificationSender, CanonStateNotifications, CanonStateSubscriptions, EvmEnvProvider,
ExecutorFactory, HeaderProvider, PostStateDataProvider, ReceiptProvider, ReceiptProviderIdExt,
StageCheckpointReader, StageCheckpointWriter, StateProvider, StateProviderBox,
StateProviderFactory, StateRootProvider, TransactionsProvider, WithdrawalsProvider,
ExecutorFactory, HashingWriter, HeaderProvider, HistoryWriter, PostStateDataProvider,
ReceiptProvider, ReceiptProviderIdExt, StageCheckpointReader, StageCheckpointWriter,
StateProvider, StateProviderBox, StateProviderFactory, StateRootProvider, StorageReader,
TransactionsProvider, WithdrawalsProvider,
};
/// Provider trait implementations.
@@ -35,10 +36,6 @@ pub use providers::{
pub mod post_state;
pub use post_state::PostState;
/// Helper types for interacting with the database
mod transaction;
pub use transaction::TransactionError;
/// Common database utilities.
mod utils;
pub use utils::{insert_block, insert_canonical_block};

View File

@@ -3,8 +3,8 @@ use crate::{
post_state::StorageChangeset,
traits::{AccountExtReader, BlockSource, ReceiptProvider, StageCheckpointWriter},
AccountReader, AccountWriter, BlockHashProvider, BlockNumProvider, BlockProvider,
EvmEnvProvider, HeaderProvider, PostState, ProviderError, StageCheckpointReader,
TransactionError, TransactionsProvider, WithdrawalsProvider,
EvmEnvProvider, HashingWriter, HeaderProvider, HistoryWriter, PostState, ProviderError,
StageCheckpointReader, StorageReader, TransactionsProvider, WithdrawalsProvider,
};
use itertools::{izip, Itertools};
use reth_db::{
@@ -145,7 +145,7 @@ fn unwind_storage_history_shards<'a, TX: reth_db::transaction::DbTxMutGAT<'a>>(
address: Address,
storage_key: H256,
block_number: BlockNumber,
) -> std::result::Result<Vec<usize>, TransactionError> {
) -> Result<Vec<usize>> {
let mut item = cursor.seek_exact(StorageShardedKey::new(address, storage_key, u64::MAX))?;
while let Some((storage_sharded_key, list)) = item {
@@ -205,77 +205,6 @@ impl<'this, TX: DbTx<'this>> DatabaseProvider<'this, TX> {
.walk(Some(T::Key::default()))?
.collect::<std::result::Result<Vec<_>, DatabaseError>>()
}
// TODO(joshie) TEMPORARY should be moved to trait providers
/// Iterate over account changesets and return all account address that were changed.
pub fn get_addresses_and_keys_of_changed_storages(
&self,
range: RangeInclusive<BlockNumber>,
) -> std::result::Result<BTreeMap<Address, BTreeSet<H256>>, TransactionError> {
self.tx
.cursor_read::<tables::StorageChangeSet>()?
.walk_range(BlockNumberAddress::range(range))?
// fold all storages and save its old state so we can remove it from HashedStorage
// it is needed as it is dup table.
.try_fold(BTreeMap::new(), |mut accounts: BTreeMap<Address, BTreeSet<H256>>, entry| {
let (BlockNumberAddress((_, address)), storage_entry) = entry?;
accounts.entry(address).or_default().insert(storage_entry.key);
Ok(accounts)
})
}
/// Get plainstate storages
#[allow(clippy::type_complexity)]
pub fn get_plainstate_storages(
&self,
iter: impl IntoIterator<Item = (Address, impl IntoIterator<Item = H256>)>,
) -> std::result::Result<Vec<(Address, Vec<(H256, U256)>)>, TransactionError> {
let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
iter.into_iter()
.map(|(address, storage)| {
storage
.into_iter()
.map(|key| -> std::result::Result<_, TransactionError> {
let ret = plain_storage
.seek_by_key_subkey(address, key)?
.filter(|v| v.key == key)
.unwrap_or_default();
Ok((key, ret.value))
})
.collect::<std::result::Result<Vec<(_, _)>, _>>()
.map(|storage| (address, storage))
})
.collect::<std::result::Result<Vec<(_, _)>, _>>()
}
/// Get all block numbers where account got changed.
///
/// NOTE: Get inclusive range of blocks.
pub fn get_storage_block_numbers_from_changesets(
&self,
range: RangeInclusive<BlockNumber>,
) -> std::result::Result<BTreeMap<(Address, H256), Vec<u64>>, TransactionError> {
let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSet>()?;
let storage_changeset_lists =
changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
BTreeMap::new(),
|mut storages: BTreeMap<(Address, H256), Vec<u64>>,
entry|
-> std::result::Result<_, TransactionError> {
let (index, storage) = entry?;
storages
.entry((index.address(), storage.key))
.or_default()
.push(index.block_number());
Ok(storages)
},
)?;
Ok(storage_changeset_lists)
}
}
impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
@@ -291,7 +220,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
&self,
chain_spec: &ChainSpec,
range: RangeInclusive<BlockNumber>,
) -> std::result::Result<Vec<(SealedBlockWithSenders, PostState)>, TransactionError> {
) -> Result<Vec<(SealedBlockWithSenders, PostState)>> {
self.get_take_block_and_execution_range::<false>(chain_spec, range)
}
@@ -300,107 +229,10 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
&self,
chain_spec: &ChainSpec,
range: RangeInclusive<BlockNumber>,
) -> std::result::Result<Vec<(SealedBlockWithSenders, PostState)>, TransactionError> {
) -> Result<Vec<(SealedBlockWithSenders, PostState)>> {
self.get_take_block_and_execution_range::<true>(chain_spec, range)
}
/// Unwind and clear storage hashing
pub fn unwind_storage_hashing(
&self,
range: Range<BlockNumberAddress>,
) -> std::result::Result<(), TransactionError> {
let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorage>()?;
// Aggregate all block changesets and make list of accounts that have been changed.
self.tx
.cursor_read::<tables::StorageChangeSet>()?
.walk_range(range)?
.collect::<std::result::Result<Vec<_>, _>>()?
.into_iter()
.rev()
// fold all account to get the old balance/nonces and account that needs to be removed
.fold(
BTreeMap::new(),
|mut accounts: BTreeMap<(Address, H256), U256>,
(BlockNumberAddress((_, address)), storage_entry)| {
accounts.insert((address, storage_entry.key), storage_entry.value);
accounts
},
)
.into_iter()
// hash addresses and collect it inside sorted BTreeMap.
// We are doing keccak only once per address.
.map(|((address, key), value)| ((keccak256(address), keccak256(key)), value))
.collect::<BTreeMap<_, _>>()
.into_iter()
// Apply values to HashedStorage (if Value is zero just remove it);
.try_for_each(
|((hashed_address, key), value)| -> std::result::Result<(), TransactionError> {
if hashed_storage
.seek_by_key_subkey(hashed_address, key)?
.filter(|entry| entry.key == key)
.is_some()
{
hashed_storage.delete_current()?;
}
if value != U256::ZERO {
hashed_storage.upsert(hashed_address, StorageEntry { key, value })?;
}
Ok(())
},
)?;
Ok(())
}
/// Unwind and clear storage history indices.
///
/// Returns number of changesets walked.
pub fn unwind_storage_history_indices(
&self,
range: Range<BlockNumberAddress>,
) -> std::result::Result<usize, TransactionError> {
let storage_changesets = self
.tx
.cursor_read::<tables::StorageChangeSet>()?
.walk_range(range)?
.collect::<std::result::Result<Vec<_>, _>>()?;
let changesets = storage_changesets.len();
let last_indices = storage_changesets
.into_iter()
// reverse so we can get lowest block number where we need to unwind account.
.rev()
// fold all storages and get last block number
.fold(
BTreeMap::new(),
|mut accounts: BTreeMap<(Address, H256), u64>, (index, storage)| {
// we just need address and lowest block number.
accounts.insert((index.address(), storage.key), index.block_number());
accounts
},
);
let mut cursor = self.tx.cursor_write::<tables::StorageHistory>()?;
for ((address, storage_key), rem_index) in last_indices {
let shard_part =
unwind_storage_history_shards::<TX>(&mut cursor, address, storage_key, rem_index)?;
// check last shard_part, if present, items needs to be reinserted.
if !shard_part.is_empty() {
// there are items in list
self.tx.put::<tables::StorageHistory>(
StorageShardedKey::new(address, storage_key, u64::MAX),
BlockNumberList::new(shard_part)
.expect("There is at least one element in list and it is sorted."),
)?;
}
}
Ok(changesets)
}
/// Traverse over changesets and plain state and recreate the [`PostState`]s for the given range
/// of blocks.
///
@@ -427,7 +259,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
fn get_take_block_execution_result_range<const TAKE: bool>(
&self,
range: RangeInclusive<BlockNumber>,
) -> std::result::Result<Vec<PostState>, TransactionError> {
) -> Result<Vec<PostState>> {
if range.is_empty() {
return Ok(Vec::new())
}
@@ -587,7 +419,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
&self,
chain_spec: &ChainSpec,
range: RangeInclusive<BlockNumber>,
) -> std::result::Result<Vec<(SealedBlockWithSenders, PostState)>, TransactionError> {
) -> Result<Vec<(SealedBlockWithSenders, PostState)>> {
if TAKE {
let storage_range = BlockNumberAddress::range(range.clone());
@@ -598,7 +430,8 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
// merkle tree
let (new_state_root, trie_updates) =
StateRoot::incremental_root_with_updates(&self.tx, range.clone())?;
StateRoot::incremental_root_with_updates(&self.tx, range.clone())
.map_err(Into::<reth_db::DatabaseError>::into)?;
let parent_number = range.start().saturating_sub(1);
let parent_state_root = self
@@ -612,12 +445,13 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
let parent_hash = self
.block_hash(parent_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?;
return Err(TransactionError::UnwindStateRootMismatch {
return Err(ProviderError::UnwindStateRootMismatch {
got: new_state_root,
expected: parent_state_root,
block_number: parent_number,
block_hash: parent_hash,
})
}
.into())
}
trie_updates.flush(&self.tx)?;
}
@@ -675,8 +509,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
fn get_take_block_transaction_range<const TAKE: bool>(
&self,
range: impl RangeBounds<BlockNumber> + Clone,
) -> std::result::Result<Vec<(BlockNumber, Vec<TransactionSignedEcRecovered>)>, TransactionError>
{
) -> Result<Vec<(BlockNumber, Vec<TransactionSignedEcRecovered>)>> {
// Raad range of block bodies to get all transactions id's of this range.
let block_bodies = self.get_or_take::<tables::BlockBodyIndices, false>(range)?;
@@ -755,7 +588,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
&self,
chain_spec: &ChainSpec,
range: impl RangeBounds<BlockNumber> + Clone,
) -> std::result::Result<Vec<SealedBlockWithSenders>, TransactionError> {
) -> Result<Vec<SealedBlockWithSenders>> {
// For block we need Headers, Bodies, Uncles, withdrawals, Transactions, Signers
let block_headers = self.get_or_take::<tables::Headers, TAKE>(range.clone())?;
@@ -837,51 +670,8 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
Ok(blocks)
}
/// Insert storage change index to database. Used inside StorageHistoryIndex stage
pub fn insert_storage_history_index(
&self,
storage_transitions: BTreeMap<(Address, H256), Vec<u64>>,
) -> std::result::Result<(), TransactionError> {
for ((address, storage_key), mut indices) in storage_transitions {
let mut last_shard = self.take_last_storage_shard(address, storage_key)?;
last_shard.append(&mut indices);
// chunk indices and insert them in shards of N size.
let mut chunks = last_shard
.iter()
.chunks(storage_sharded_key::NUM_OF_INDICES_IN_SHARD)
.into_iter()
.map(|chunks| chunks.map(|i| *i as usize).collect::<Vec<usize>>())
.collect::<Vec<_>>();
let last_chunk = chunks.pop();
// chunk indices and insert them in shards of N size.
chunks.into_iter().try_for_each(|list| {
self.tx.put::<tables::StorageHistory>(
StorageShardedKey::new(
address,
storage_key,
*list.last().expect("Chuck does not return empty list") as BlockNumber,
),
BlockNumberList::new(list).expect("Indices are presorted and not empty"),
)
})?;
// Insert last list with u64::MAX
if let Some(last_list) = last_chunk {
self.tx.put::<tables::StorageHistory>(
StorageShardedKey::new(address, storage_key, u64::MAX),
BlockNumberList::new(last_list).expect("Indices are presorted and not empty"),
)?;
}
}
Ok(())
}
/// Query the block body by number.
pub fn block_body_indices(
&self,
number: BlockNumber,
) -> std::result::Result<StoredBlockBodyIndices, TransactionError> {
pub fn block_body_indices(&self, number: BlockNumber) -> Result<StoredBlockBodyIndices> {
let body = self
.tx
.get::<tables::BlockBodyIndices>(number)?
@@ -948,11 +738,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
/// Load last shard and check if it is full and remove if it is not. If list is empty, last
/// shard was full or there is no shards at all.
pub fn take_last_storage_shard(
&self,
address: Address,
storage_key: H256,
) -> std::result::Result<Vec<u64>, TransactionError> {
pub fn take_last_storage_shard(&self, address: Address, storage_key: H256) -> Result<Vec<u64>> {
let mut cursor = self.tx.cursor_read::<tables::StorageHistory>()?;
let last = cursor.seek_exact(StorageShardedKey::new(address, storage_key, u64::MAX))?;
if let Some((storage_shard_key, list)) = last {
@@ -963,44 +749,6 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
}
Ok(Vec::new())
}
/// iterate over storages and insert them to hashing table
pub fn insert_storage_for_hashing(
&self,
storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = (H256, U256)>)>,
) -> std::result::Result<(), TransactionError> {
// hash values
let hashed = storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, (key, value)| {
map.insert(keccak256(key), value);
map
});
map.insert(keccak256(address), storage);
map
});
let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorage>()?;
// Hash the address and key and apply them to HashedStorage (if Storage is None
// just remove it);
hashed.into_iter().try_for_each(|(hashed_address, storage)| {
storage.into_iter().try_for_each(
|(key, value)| -> std::result::Result<(), TransactionError> {
if hashed_storage
.seek_by_key_subkey(hashed_address, key)?
.filter(|entry| entry.key == key)
.is_some()
{
hashed_storage.delete_current()?;
}
if value != U256::ZERO {
hashed_storage.upsert(hashed_address, StorageEntry { key, value })?;
}
Ok(())
},
)
})?;
Ok(())
}
/// Append blocks and insert its post state.
/// This will insert block data to all related tables and will update pipeline progress.
@@ -1008,7 +756,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
&mut self,
blocks: Vec<SealedBlockWithSenders>,
state: PostState,
) -> std::result::Result<(), TransactionError> {
) -> Result<()> {
if blocks.is_empty() {
return Ok(())
}
@@ -1047,73 +795,10 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
&mut self,
block: SealedBlock,
senders: Option<Vec<Address>>,
) -> std::result::Result<(), TransactionError> {
) -> Result<()> {
insert_canonical_block(self.tx_mut(), block, senders)?;
Ok(())
}
/// Read account/storage changesets and update account/storage history indices.
pub fn calculate_history_indices(
&mut self,
range: RangeInclusive<BlockNumber>,
) -> std::result::Result<(), TransactionError> {
// account history stage
{
let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
self.insert_account_history_index(indices)?;
}
// storage history stage
{
let indices = self.get_storage_block_numbers_from_changesets(range)?;
self.insert_storage_history_index(indices)?;
}
Ok(())
}
/// Calculate the hashes of all changed accounts and storages, and finally calculate the state
/// root.
///
/// The hashes are calculated from `fork_block_number + 1` to `current_block_number`.
///
/// The resulting state root is compared with `expected_state_root`.
pub fn insert_hashes(
&mut self,
range: RangeInclusive<BlockNumber>,
end_block_hash: H256,
expected_state_root: H256,
) -> std::result::Result<(), TransactionError> {
// storage hashing stage
{
let lists = self.get_addresses_and_keys_of_changed_storages(range.clone())?;
let storages = self.get_plainstate_storages(lists.into_iter())?;
self.insert_storage_for_hashing(storages.into_iter())?;
}
// account hashing stage
{
let lists = self.changed_accounts_with_range(range.clone())?;
let accounts = self.basic_accounts(lists.into_iter())?;
self.insert_account_for_hashing(accounts.into_iter())?;
}
// merkle tree
{
let (state_root, trie_updates) =
StateRoot::incremental_root_with_updates(&self.tx, range.clone())?;
if state_root != expected_state_root {
return Err(TransactionError::StateRootMismatch {
got: state_root,
expected: expected_state_root,
block_number: *range.end(),
block_hash: end_block_hash,
})
}
trie_updates.flush(&self.tx)?;
}
Ok(())
}
}
impl<'this, TX: DbTx<'this>> AccountReader for DatabaseProvider<'this, TX> {
@@ -1816,3 +1501,282 @@ impl<'this, TX: DbTxMut<'this>> StageCheckpointWriter for DatabaseProvider<'this
Ok(())
}
}
impl<'this, TX: DbTx<'this>> StorageReader for DatabaseProvider<'this, TX> {
fn plainstate_storages(
&self,
addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = H256>)>,
) -> Result<Vec<(Address, Vec<StorageEntry>)>> {
let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
addresses_with_keys
.into_iter()
.map(|(address, storage)| {
storage
.into_iter()
.map(|key| -> Result<_> {
Ok(plain_storage
.seek_by_key_subkey(address, key)?
.filter(|v| v.key == key)
.unwrap_or_else(|| StorageEntry { key, value: Default::default() }))
})
.collect::<Result<Vec<_>>>()
.map(|storage| (address, storage))
})
.collect::<Result<Vec<(_, _)>>>()
}
fn changed_storages_with_range(
&self,
range: RangeInclusive<BlockNumber>,
) -> Result<BTreeMap<Address, BTreeSet<H256>>> {
self.tx
.cursor_read::<tables::StorageChangeSet>()?
.walk_range(BlockNumberAddress::range(range))?
// fold all storages and save its old state so we can remove it from HashedStorage
// it is needed as it is dup table.
.try_fold(BTreeMap::new(), |mut accounts: BTreeMap<Address, BTreeSet<H256>>, entry| {
let (BlockNumberAddress((_, address)), storage_entry) = entry?;
accounts.entry(address).or_default().insert(storage_entry.key);
Ok(accounts)
})
}
fn changed_storages_and_blocks_with_range(
&self,
range: RangeInclusive<BlockNumber>,
) -> Result<BTreeMap<(Address, H256), Vec<u64>>> {
let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSet>()?;
let storage_changeset_lists =
changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
BTreeMap::new(),
|mut storages: BTreeMap<(Address, H256), Vec<u64>>, entry| -> Result<_> {
let (index, storage) = entry?;
storages
.entry((index.address(), storage.key))
.or_default()
.push(index.block_number());
Ok(storages)
},
)?;
Ok(storage_changeset_lists)
}
}
impl<'this, TX: DbTxMut<'this> + DbTx<'this>> HashingWriter for DatabaseProvider<'this, TX> {
fn insert_hashes(
&self,
range: RangeInclusive<BlockNumber>,
end_block_hash: H256,
expected_state_root: H256,
) -> Result<()> {
// storage hashing stage
{
let lists = self.changed_storages_with_range(range.clone())?;
let storages = self.plainstate_storages(lists.into_iter())?;
self.insert_storage_for_hashing(storages.into_iter())?;
}
// account hashing stage
{
let lists = self.changed_accounts_with_range(range.clone())?;
let accounts = self.basic_accounts(lists.into_iter())?;
self.insert_account_for_hashing(accounts.into_iter())?;
}
// merkle tree
{
let (state_root, trie_updates) =
StateRoot::incremental_root_with_updates(&self.tx, range.clone())
.map_err(Into::<reth_db::DatabaseError>::into)?;
if state_root != expected_state_root {
return Err(ProviderError::StateRootMismatch {
got: state_root,
expected: expected_state_root,
block_number: *range.end(),
block_hash: end_block_hash,
}
.into())
}
trie_updates.flush(&self.tx)?;
}
Ok(())
}
fn unwind_storage_hashing(&self, range: Range<BlockNumberAddress>) -> Result<()> {
let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorage>()?;
// Aggregate all block changesets and make list of accounts that have been changed.
self.tx
.cursor_read::<tables::StorageChangeSet>()?
.walk_range(range)?
.collect::<std::result::Result<Vec<_>, _>>()?
.into_iter()
.rev()
// fold all account to get the old balance/nonces and account that needs to be removed
.fold(
BTreeMap::new(),
|mut accounts: BTreeMap<(Address, H256), U256>,
(BlockNumberAddress((_, address)), storage_entry)| {
accounts.insert((address, storage_entry.key), storage_entry.value);
accounts
},
)
.into_iter()
// hash addresses and collect it inside sorted BTreeMap.
// We are doing keccak only once per address.
.map(|((address, key), value)| ((keccak256(address), keccak256(key)), value))
.collect::<BTreeMap<_, _>>()
.into_iter()
// Apply values to HashedStorage (if Value is zero just remove it);
.try_for_each(|((hashed_address, key), value)| -> Result<()> {
if hashed_storage
.seek_by_key_subkey(hashed_address, key)?
.filter(|entry| entry.key == key)
.is_some()
{
hashed_storage.delete_current()?;
}
if value != U256::ZERO {
hashed_storage.upsert(hashed_address, StorageEntry { key, value })?;
}
Ok(())
})?;
Ok(())
}
fn insert_storage_for_hashing(
&self,
storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
) -> Result<()> {
// hash values
let hashed = storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
map.insert(keccak256(entry.key), entry.value);
map
});
map.insert(keccak256(address), storage);
map
});
let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorage>()?;
// Hash the address and key and apply them to HashedStorage (if Storage is None
// just remove it);
hashed.into_iter().try_for_each(|(hashed_address, storage)| {
storage.into_iter().try_for_each(|(key, value)| -> Result<()> {
if hashed_storage
.seek_by_key_subkey(hashed_address, key)?
.filter(|entry| entry.key == key)
.is_some()
{
hashed_storage.delete_current()?;
}
if value != U256::ZERO {
hashed_storage.upsert(hashed_address, StorageEntry { key, value })?;
}
Ok(())
})
})?;
Ok(())
}
}
impl<'this, TX: DbTxMut<'this> + DbTx<'this>> HistoryWriter for DatabaseProvider<'this, TX> {
fn calculate_history_indices(&self, range: RangeInclusive<BlockNumber>) -> Result<()> {
// account history stage
{
let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
self.insert_account_history_index(indices)?;
}
// storage history stage
{
let indices = self.changed_storages_and_blocks_with_range(range)?;
self.insert_storage_history_index(indices)?;
}
Ok(())
}
fn insert_storage_history_index(
&self,
storage_transitions: BTreeMap<(Address, H256), Vec<u64>>,
) -> Result<()> {
for ((address, storage_key), mut indices) in storage_transitions {
let mut last_shard = self.take_last_storage_shard(address, storage_key)?;
last_shard.append(&mut indices);
// chunk indices and insert them in shards of N size.
let mut chunks = last_shard
.iter()
.chunks(storage_sharded_key::NUM_OF_INDICES_IN_SHARD)
.into_iter()
.map(|chunks| chunks.map(|i| *i as usize).collect::<Vec<usize>>())
.collect::<Vec<_>>();
let last_chunk = chunks.pop();
// chunk indices and insert them in shards of N size.
chunks.into_iter().try_for_each(|list| {
self.tx.put::<tables::StorageHistory>(
StorageShardedKey::new(
address,
storage_key,
*list.last().expect("Chuck does not return empty list") as BlockNumber,
),
BlockNumberList::new(list).expect("Indices are presorted and not empty"),
)
})?;
// Insert last list with u64::MAX
if let Some(last_list) = last_chunk {
self.tx.put::<tables::StorageHistory>(
StorageShardedKey::new(address, storage_key, u64::MAX),
BlockNumberList::new(last_list).expect("Indices are presorted and not empty"),
)?;
}
}
Ok(())
}
fn unwind_storage_history_indices(&self, range: Range<BlockNumberAddress>) -> Result<usize> {
let storage_changesets = self
.tx
.cursor_read::<tables::StorageChangeSet>()?
.walk_range(range)?
.collect::<std::result::Result<Vec<_>, _>>()?;
let changesets = storage_changesets.len();
let last_indices = storage_changesets
.into_iter()
// reverse so we can get lowest block number where we need to unwind account.
.rev()
// fold all storages and get last block number
.fold(
BTreeMap::new(),
|mut accounts: BTreeMap<(Address, H256), u64>, (index, storage)| {
// we just need address and lowest block number.
accounts.insert((index.address(), storage.key), index.block_number());
accounts
},
);
let mut cursor = self.tx.cursor_write::<tables::StorageHistory>()?;
for ((address, storage_key), rem_index) in last_indices {
let shard_part =
unwind_storage_history_shards::<TX>(&mut cursor, address, storage_key, rem_index)?;
// check last shard_part, if present, items needs to be reinserted.
if !shard_part.is_empty() {
// there are items in list
self.tx.put::<tables::StorageHistory>(
StorageShardedKey::new(address, storage_key, u64::MAX),
BlockNumberList::new(shard_part)
.expect("There is at least one element in list and it is sorted."),
)?;
}
}
Ok(changesets)
}
}

View File

@@ -0,0 +1,31 @@
use auto_impl::auto_impl;
use reth_db::models::BlockNumberAddress;
use reth_interfaces::Result;
use reth_primitives::{Address, BlockNumber, StorageEntry, H256};
use std::ops::{Range, RangeInclusive};
/// Hashing Writer
#[auto_impl(&, Arc, Box)]
pub trait HashingWriter: Send + Sync {
/// Unwind and clear storage hashing
fn unwind_storage_hashing(&self, range: Range<BlockNumberAddress>) -> Result<()>;
/// iterate over storages and insert them to hashing table
fn insert_storage_for_hashing(
&self,
storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
) -> Result<()>;
/// Calculate the hashes of all changed accounts and storages, and finally calculate the state
/// root.
///
/// The hashes are calculated from `fork_block_number + 1` to `current_block_number`.
///
/// The resulting state root is compared with `expected_state_root`.
fn insert_hashes(
&self,
range: RangeInclusive<BlockNumber>,
end_block_hash: H256,
expected_state_root: H256,
) -> Result<()>;
}

View File

@@ -0,0 +1,26 @@
use auto_impl::auto_impl;
use reth_db::models::BlockNumberAddress;
use reth_interfaces::Result;
use reth_primitives::{Address, BlockNumber, H256};
use std::{
collections::BTreeMap,
ops::{Range, RangeInclusive},
};
/// History Writer
#[auto_impl(&, Arc, Box)]
pub trait HistoryWriter: Send + Sync {
/// Unwind and clear storage history indices.
///
/// Returns number of changesets walked.
fn unwind_storage_history_indices(&self, range: Range<BlockNumberAddress>) -> Result<usize>;
/// Insert storage change index to database. Used inside StorageHistoryIndex stage
fn insert_storage_history_index(
&self,
storage_transitions: BTreeMap<(Address, H256), Vec<u64>>,
) -> Result<()>;
/// Read account/storage changesets and update account/storage history indices.
fn calculate_history_indices(&self, range: RangeInclusive<BlockNumber>) -> Result<()>;
}

View File

@@ -3,6 +3,9 @@
mod account;
pub use account::{AccountExtReader, AccountReader, AccountWriter};
mod storage;
pub use storage::StorageReader;
mod block;
pub use block::{BlockProvider, BlockProviderIdExt, BlockSource};
@@ -47,3 +50,9 @@ pub use chain::{
mod stage_checkpoint;
pub use stage_checkpoint::{StageCheckpointReader, StageCheckpointWriter};
mod hashing;
pub use hashing::HashingWriter;
mod history;
pub use history::HistoryWriter;

View File

@@ -0,0 +1,33 @@
use std::{
collections::{BTreeMap, BTreeSet},
ops::RangeInclusive,
};
use auto_impl::auto_impl;
use reth_interfaces::Result;
use reth_primitives::{Address, BlockNumber, StorageEntry, H256};
/// Storage reader
#[auto_impl(&, Arc, Box)]
pub trait StorageReader: Send + Sync {
/// Get plainstate storages for addresses and storage keys.
fn plainstate_storages(
&self,
addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = H256>)>,
) -> Result<Vec<(Address, Vec<StorageEntry>)>>;
/// Iterate over storage changesets and return all storage slots that were changed.
fn changed_storages_with_range(
&self,
range: RangeInclusive<BlockNumber>,
) -> Result<BTreeMap<Address, BTreeSet<H256>>>;
/// Iterate over storage changesets and return all storage slots that were changed alongside
/// each specific set of blocks.
///
/// NOTE: Get inclusive range of blocks.
fn changed_storages_and_blocks_with_range(
&self,
range: RangeInclusive<BlockNumber>,
) -> Result<BTreeMap<(Address, H256), Vec<u64>>>;
}

View File

@@ -3,47 +3,6 @@ use reth_primitives::{BlockHash, BlockNumber, H256};
use reth_trie::StateRootError;
use std::fmt::Debug;
/// An error that can occur when using the transaction container
#[derive(Debug, PartialEq, Eq, Clone, thiserror::Error)]
pub enum TransactionError {
/// The transaction encountered a database error.
#[error(transparent)]
Database(#[from] DbError),
/// The transaction encountered a database integrity error.
#[error(transparent)]
DatabaseIntegrity(#[from] ProviderError),
/// The trie error.
#[error(transparent)]
TrieError(#[from] StateRootError),
/// Root mismatch
#[error("Merkle trie root mismatch at #{block_number} ({block_hash:?}). Got: {got:?}. Expected: {expected:?}")]
StateRootMismatch {
/// Expected root
expected: H256,
/// Calculated root
got: H256,
/// Block number
block_number: BlockNumber,
/// Block hash
block_hash: BlockHash,
},
/// Root mismatch during unwind
#[error("Unwind merkle trie root mismatch at #{block_number} ({block_hash:?}). Got: {got:?}. Expected: {expected:?}")]
UnwindStateRootMismatch {
/// Expected root
expected: H256,
/// Calculated root
got: H256,
/// Target block number
block_number: BlockNumber,
/// Block hash
block_hash: BlockHash,
},
/// Internal interfaces error
#[error("Internal error")]
InternalError(#[from] reth_interfaces::Error),
}
#[cfg(test)]
mod test {
use crate::{

View File

@@ -1443,11 +1443,10 @@ mod tests {
let mut pool = AllTransactions::default();
let tx = MockTransaction::eip1559().inc_price().inc_limit();
let first = f.validated(tx.clone());
let _res = pool.insert_tx(first.clone(), on_chain_balance, on_chain_nonce);
let _res = pool.insert_tx(first, on_chain_balance, on_chain_nonce);
let mut replacement = f.validated(tx.rng_hash());
replacement.transaction = replacement.transaction.decr_price();
let err =
pool.insert_tx(replacement.clone(), on_chain_balance, on_chain_nonce).unwrap_err();
let err = pool.insert_tx(replacement, on_chain_balance, on_chain_nonce).unwrap_err();
assert!(matches!(err, InsertErr::Underpriced { .. }));
}