perf(trie): parallel storage roots

This commit is contained in:
Roman Krasiuk
2024-02-14 23:37:51 +01:00
parent ada3547fd1
commit 1cf04f0978
11 changed files with 266 additions and 56 deletions

12
Cargo.lock generated
View File

@@ -637,6 +637,17 @@ dependencies = [
"event-listener",
]
[[package]]
name = "async-scoped"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4042078ea593edffc452eef14e99fdb2b120caa4ad9618bcdeabc4a023b98740"
dependencies = [
"futures",
"pin-project",
"tokio",
]
[[package]]
name = "async-sse"
version = "5.1.0"
@@ -6958,6 +6969,7 @@ version = "0.1.0-alpha.17"
dependencies = [
"alloy-chains",
"alloy-rlp",
"async-scoped",
"auto_impl",
"criterion",
"derive_more",

View File

@@ -243,7 +243,7 @@ impl Chain {
fn append_trie_updates(&mut self, other_trie_updates: Option<TrieUpdates>) {
if let Some((trie_updates, other)) = self.trie_updates.as_mut().zip(other_trie_updates) {
// Extend trie updates.
trie_updates.extend(other.into_iter());
trie_updates.extend(other);
} else {
// Reset trie updates as they are no longer valid.
self.trie_updates.take();

View File

@@ -26,6 +26,8 @@ alloy-chains.workspace = true
tracing.workspace = true
# misc
tokio = { workspace = true, default-features = false, features = ["rt"] }
async-scoped = { version = "0.9.0", features = ["use-tokio"] }
thiserror.workspace = true
derive_more = "0.99"
auto_impl = "1"

View File

@@ -0,0 +1,49 @@
use reth_db::{
cursor::DbCursorRO,
database::Database,
table::{DupSort, Table},
tables,
transaction::DbTx,
DatabaseError,
};
use reth_interfaces::db::DatabaseErrorInfo;
use reth_primitives::B256;
/// TODO:
#[derive(Clone, Debug)]
pub struct ConsistentDatabaseProvider<DB> {
database: DB,
tip: Option<B256>,
}
impl<DB: Database> ConsistentDatabaseProvider<DB> {
/// Create new database provider.
pub fn new(database: DB) -> Result<Self, DatabaseError> {
let tip =
database.tx()?.cursor_read::<tables::CanonicalHeaders>()?.last()?.map(|(_, hash)| hash);
Ok(Self { database, tip })
}
/// Create new database provider with tip
pub fn new_with_tip(database: DB, tip: Option<B256>) -> Self {
Self { database, tip }
}
/// Opens new read transaction and performs a consistency check on the current tip.
pub fn tx(&self) -> Result<DB::TX, DatabaseError> {
let tx = self.database.tx()?;
let tip = tx.cursor_read::<tables::CanonicalHeaders>()?.last()?.map(|(_, hash)| hash);
if self.tip != tip {
return Err(DatabaseError::InitTx(DatabaseErrorInfo {
message: format!(
"tx has inconsistent view. expected: {:?}. got: {:?}",
self.tip, tip
),
// Corresponds to `MDBX_PROBLEM`
// <https://github.com/paradigmxyz/reth/blob/ada3547fd17fa9ff2aaf16449811cf5287f1c089/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.h#L1892>
code: -30779,
}))
}
Ok(tx)
}
}

View File

@@ -1,20 +1,36 @@
use super::{HashedAccountCursor, HashedCursorFactory, HashedStorageCursor};
use crate::database_provider::ConsistentDatabaseProvider;
use reth_db::{
cursor::{DbCursorRO, DbDupCursorRO},
database::Database,
tables,
transaction::DbTx,
DatabaseError,
};
use reth_primitives::{Account, StorageEntry, B256};
impl<DB: Database> HashedCursorFactory for ConsistentDatabaseProvider<DB> {
type AccountCursor = <DB::TX as DbTx>::Cursor<tables::HashedAccount>;
type StorageCursor = <DB::TX as DbTx>::DupCursor<tables::HashedStorage>;
fn hashed_account_cursor(&self) -> Result<Self::AccountCursor, DatabaseError> {
self.tx()?.cursor_read::<tables::HashedAccount>()
}
fn hashed_storage_cursor(&self) -> Result<Self::StorageCursor, DatabaseError> {
self.tx()?.cursor_dup_read::<tables::HashedStorage>()
}
}
impl<'a, TX: DbTx> HashedCursorFactory for &'a TX {
type AccountCursor = <TX as DbTx>::Cursor<tables::HashedAccount>;
type StorageCursor = <TX as DbTx>::DupCursor<tables::HashedStorage>;
fn hashed_account_cursor(&self) -> Result<Self::AccountCursor, reth_db::DatabaseError> {
fn hashed_account_cursor(&self) -> Result<Self::AccountCursor, DatabaseError> {
self.cursor_read::<tables::HashedAccount>()
}
fn hashed_storage_cursor(&self) -> Result<Self::StorageCursor, reth_db::DatabaseError> {
fn hashed_storage_cursor(&self) -> Result<Self::StorageCursor, DatabaseError> {
self.cursor_dup_read::<tables::HashedStorage>()
}
}
@@ -23,11 +39,11 @@ impl<C> HashedAccountCursor for C
where
C: DbCursorRO<tables::HashedAccount>,
{
fn seek(&mut self, key: B256) -> Result<Option<(B256, Account)>, reth_db::DatabaseError> {
fn seek(&mut self, key: B256) -> Result<Option<(B256, Account)>, DatabaseError> {
self.seek(key)
}
fn next(&mut self) -> Result<Option<(B256, Account)>, reth_db::DatabaseError> {
fn next(&mut self) -> Result<Option<(B256, Account)>, DatabaseError> {
self.next()
}
}
@@ -36,19 +52,15 @@ impl<C> HashedStorageCursor for C
where
C: DbCursorRO<tables::HashedStorage> + DbDupCursorRO<tables::HashedStorage>,
{
fn is_storage_empty(&mut self, key: B256) -> Result<bool, reth_db::DatabaseError> {
fn is_storage_empty(&mut self, key: B256) -> Result<bool, DatabaseError> {
Ok(self.seek_exact(key)?.is_none())
}
fn seek(
&mut self,
key: B256,
subkey: B256,
) -> Result<Option<StorageEntry>, reth_db::DatabaseError> {
fn seek(&mut self, key: B256, subkey: B256) -> Result<Option<StorageEntry>, DatabaseError> {
self.seek_by_key_subkey(key, subkey)
}
fn next(&mut self) -> Result<Option<StorageEntry>, reth_db::DatabaseError> {
fn next(&mut self) -> Result<Option<StorageEntry>, DatabaseError> {
self.next_dup_val()
}
}

View File

@@ -50,6 +50,9 @@ pub mod updates;
mod progress;
pub use progress::{IntermediateStateRootState, StateRootProgress};
/// TODO:
pub mod database_provider;
/// Collection of trie-related test utilities.
#[cfg(any(test, feature = "test-utils"))]
pub mod test_utils;

View File

@@ -1,7 +1,7 @@
use reth_primitives::{trie::Nibbles, B256};
use std::{
collections::{HashMap, HashSet},
rc::Rc,
sync::Arc,
};
mod loader;
@@ -121,7 +121,7 @@ impl PrefixSetMut {
self.keys.dedup();
}
PrefixSet { keys: Rc::new(self.keys), index: self.index }
PrefixSet { keys: Arc::new(self.keys), index: self.index }
}
}
@@ -130,7 +130,7 @@ impl PrefixSetMut {
/// See also [PrefixSetMut::freeze].
#[derive(Debug, Default, Clone)]
pub struct PrefixSet {
keys: Rc<Vec<Nibbles>>,
keys: Arc<Vec<Nibbles>>,
index: usize,
}

View File

@@ -1,4 +1,5 @@
use crate::{
database_provider::ConsistentDatabaseProvider,
hashed_cursor::HashedPostStateCursorFactory,
prefix_set::{PrefixSetMut, TriePrefixSets},
updates::TrieUpdates,
@@ -6,6 +7,7 @@ use crate::{
};
use reth_db::{
cursor::DbCursorRO,
database::Database,
models::{AccountBeforeTx, BlockNumberAddress},
tables,
transaction::DbTx,
@@ -248,6 +250,33 @@ impl HashedPostState {
.with_prefix_sets(prefix_sets)
.root_with_updates()
}
/// TODO:
pub fn state_root_parallel<DB: Database + Clone>(
&self,
db: DB,
) -> Result<B256, StateRootError> {
let sorted = self.clone().into_sorted();
let prefix_sets = self.construct_prefix_sets();
let database = ConsistentDatabaseProvider::new(db)?;
StateRoot::from_db(database.clone())
.with_hashed_cursor_factory(HashedPostStateCursorFactory::new(database, &sorted))
.with_prefix_sets(prefix_sets)
.root_parallel()
}
pub fn state_root_parallel_with_updates<DB: Database + Clone>(
&self,
db: DB,
) -> Result<(B256, TrieUpdates), StateRootError> {
let sorted = self.clone().into_sorted();
let prefix_sets = self.construct_prefix_sets();
let database = ConsistentDatabaseProvider::new(db)?;
StateRoot::from_db(database.clone())
.with_hashed_cursor_factory(HashedPostStateCursorFactory::new(database, &sorted))
.with_prefix_sets(prefix_sets)
.root_parallel_with_updates()
}
}
/// Representation of in-memory hashed storage.

View File

@@ -1,4 +1,5 @@
use crate::{
database_provider::ConsistentDatabaseProvider,
hashed_cursor::{HashedCursorFactory, HashedStorageCursor},
node_iter::{AccountNode, AccountNodeIter, StorageNode, StorageNodeIter},
prefix_set::{PrefixSet, PrefixSetLoader, PrefixSetMut, TriePrefixSets},
@@ -16,7 +17,7 @@ use reth_primitives::{
trie::{HashBuilder, Nibbles, TrieAccount},
Address, BlockNumber, B256,
};
use std::ops::RangeInclusive;
use std::{collections::HashMap, ops::RangeInclusive};
use tracing::{debug, trace};
/// StateRoot is used to compute the root node of a state trie.
@@ -28,6 +29,8 @@ pub struct StateRoot<T, H> {
pub hashed_cursor_factory: H,
/// A set of prefix sets that have changes.
pub prefix_sets: TriePrefixSets,
/// Pre-computed storage roots.
storage_roots: HashMap<B256, Result<(B256, usize, TrieUpdates), StorageRootError>>, // TODO:
/// Previous intermediate state.
previous_state: Option<IntermediateStateRootState>,
/// The number of updates after which the intermediate progress should be returned.
@@ -59,12 +62,22 @@ impl<T, H> StateRoot<T, H> {
self
}
/// Set pre-computed storage root results.
fn with_storage_roots(
mut self,
storage_roots: HashMap<B256, Result<(B256, usize, TrieUpdates), StorageRootError>>, /* TODO: */
) -> Self {
self.storage_roots = storage_roots;
self
}
/// Set the hashed cursor factory.
pub fn with_hashed_cursor_factory<HF>(self, hashed_cursor_factory: HF) -> StateRoot<T, HF> {
StateRoot {
trie_cursor_factory: self.trie_cursor_factory,
hashed_cursor_factory,
prefix_sets: self.prefix_sets,
storage_roots: self.storage_roots,
threshold: self.threshold,
previous_state: self.previous_state,
}
@@ -76,6 +89,7 @@ impl<T, H> StateRoot<T, H> {
trie_cursor_factory,
hashed_cursor_factory: self.hashed_cursor_factory,
prefix_sets: self.prefix_sets,
storage_roots: self.storage_roots,
threshold: self.threshold,
previous_state: self.previous_state,
}
@@ -89,6 +103,7 @@ impl<'a, TX: DbTx> StateRoot<&'a TX, &'a TX> {
trie_cursor_factory: tx,
hashed_cursor_factory: tx,
prefix_sets: TriePrefixSets::default(),
storage_roots: HashMap::default(),
previous_state: None,
threshold: 100_000,
}
@@ -118,7 +133,7 @@ impl<'a, TX: DbTx> StateRoot<&'a TX, &'a TX> {
tx: &'a TX,
range: RangeInclusive<BlockNumber>,
) -> Result<B256, StateRootError> {
debug!(target: "trie::loader", ?range, "incremental state root");
debug!(target: "trie::state_root", ?range, "incremental state root");
Self::incremental_root_calculator(tx, range)?.root()
}
@@ -134,7 +149,7 @@ impl<'a, TX: DbTx> StateRoot<&'a TX, &'a TX> {
tx: &'a TX,
range: RangeInclusive<BlockNumber>,
) -> Result<(B256, TrieUpdates), StateRootError> {
debug!(target: "trie::loader", ?range, "incremental state root");
debug!(target: "trie::state_root", ?range, "incremental state root");
Self::incremental_root_calculator(tx, range)?.root_with_updates()
}
@@ -148,11 +163,24 @@ impl<'a, TX: DbTx> StateRoot<&'a TX, &'a TX> {
tx: &'a TX,
range: RangeInclusive<BlockNumber>,
) -> Result<StateRootProgress, StateRootError> {
debug!(target: "trie::loader", ?range, "incremental state root with progress");
debug!(target: "trie::state_root", ?range, "incremental state root with progress");
Self::incremental_root_calculator(tx, range)?.root_with_progress()
}
}
impl<DB: Clone> StateRoot<ConsistentDatabaseProvider<DB>, ConsistentDatabaseProvider<DB>> {
pub fn from_db(db: ConsistentDatabaseProvider<DB>) -> Self {
Self {
trie_cursor_factory: db.clone(),
hashed_cursor_factory: db,
prefix_sets: TriePrefixSets::default(),
storage_roots: HashMap::default(),
previous_state: None,
threshold: 100_000,
}
}
}
impl<T, H> StateRoot<T, H>
where
T: TrieCursorFactory + Clone,
@@ -182,7 +210,7 @@ where
pub fn root(self) -> Result<B256, StateRootError> {
match self.calculate(false)? {
StateRootProgress::Complete(root, _, _) => Ok(root),
StateRootProgress::Progress(..) => unreachable!(), // update retenion is disabled
StateRootProgress::Progress(..) => unreachable!(), // update retention is disabled
}
}
@@ -196,8 +224,8 @@ where
self.calculate(true)
}
fn calculate(self, retain_updates: bool) -> Result<StateRootProgress, StateRootError> {
trace!(target: "trie::loader", "calculating state root");
fn calculate(mut self, retain_updates: bool) -> Result<StateRootProgress, StateRootError> {
trace!(target: "trie::state_root", "calculating state root");
let mut trie_updates = TrieUpdates::default();
let hashed_account_cursor = self.hashed_cursor_factory.hashed_account_cursor()?;
@@ -235,35 +263,17 @@ where
AccountNode::Leaf(hashed_address, account) => {
hashed_entries_walked += 1;
// We assume we can always calculate a storage root without
// OOMing. This opens us up to a potential DOS vector if
// a contract had too many storage entries and they were
// all buffered w/o us returning and committing our intermediate
// progress.
// TODO: We can consider introducing the TrieProgress::Progress/Complete
// abstraction inside StorageRoot, but let's give it a try as-is for now.
let storage_root_calculator = StorageRoot::new_hashed(
self.trie_cursor_factory.clone(),
self.hashed_cursor_factory.clone(),
hashed_address,
)
.with_prefix_set(
self.prefix_sets
.storage_prefix_sets
.get(&hashed_address)
.cloned()
.unwrap_or_default(),
);
let storage_root = if retain_updates {
let (root, storage_slots_walked, updates) =
storage_root_calculator.root_with_updates()?;
hashed_entries_walked += storage_slots_walked;
trie_updates.extend(updates.into_iter());
root
} else {
storage_root_calculator.root()?
};
let (storage_root, storage_slots_walked, updates) =
self.storage_roots.remove(&hashed_address).unwrap_or_else(|| {
StorageRoot::new_hashed(
self.trie_cursor_factory.clone(),
self.hashed_cursor_factory.clone(),
hashed_address,
)
.calculate(retain_updates)
})?;
hashed_entries_walked += storage_slots_walked;
trie_updates.extend(updates);
let account = TrieAccount::from((account, storage_root));
@@ -286,7 +296,7 @@ where
last_account_key: hashed_address,
};
trie_updates.extend(walker_updates.into_iter());
trie_updates.extend(walker_updates);
trie_updates.extend_with_account_updates(hash_builder_updates);
return Ok(StateRootProgress::Progress(
@@ -304,7 +314,7 @@ where
let (_, walker_updates) = account_node_iter.walker.split();
let (_, hash_builder_updates) = hash_builder.split();
trie_updates.extend(walker_updates.into_iter());
trie_updates.extend(walker_updates);
trie_updates.extend_with_account_updates(hash_builder_updates);
trie_updates.extend_with_deletes(
self.prefix_sets.destroyed_accounts.into_iter().map(TrieKey::StorageTrie),
@@ -314,6 +324,73 @@ where
}
}
impl<T, H> StateRoot<T, H>
where
T: TrieCursorFactory + Clone + Send + Sync,
H: HashedCursorFactory + Clone + Send + Sync,
{
/// TODO:
/// Walks the intermediate nodes of existing state trie (if any) and hashed entries. Feeds the
/// nodes into the hash builder. Collects the updates in the process.
///
/// Ignores the threshold.
///
/// # Returns
///
/// The intermediate progress of state root computation and the trie updates.
pub fn root_parallel_with_updates(self) -> Result<(B256, TrieUpdates), StateRootError> {
match self.with_no_threshold().calculate_parallel(true)? {
StateRootProgress::Complete(root, _, updates) => Ok((root, updates)),
StateRootProgress::Progress(..) => unreachable!(), // unreachable threshold
}
}
/// TODO:
/// Walks the intermediate nodes of existing state trie (if any) and hashed entries. Feeds the
/// nodes into the hash builder.
///
/// # Returns
///
/// The state root hash.
pub fn root_parallel(self) -> Result<B256, StateRootError> {
match self.calculate_parallel(false)? {
StateRootProgress::Complete(root, _, _) => Ok(root),
StateRootProgress::Progress(..) => unreachable!(), // update retention is disabled
}
}
/// TODO:
fn calculate_parallel(
mut self,
retain_updates: bool,
) -> Result<StateRootProgress, StateRootError> {
// Pre-calculate storage roots in parallel for accounts which had storage changes.
// TODO: consider pre-calculating storage root for any changed account.
debug!(target: "trie::state_root", len = self.prefix_sets.storage_prefix_sets.len(), "pre-calculating storage roots");
let (_, storage_root_results) = async_scoped::TokioScope::scope_and_block(|scope| {
for (hashed_address, prefix_set) in self.prefix_sets.storage_prefix_sets.drain() {
let trie_cursor_factory = self.trie_cursor_factory.clone();
let hashed_cursor_factory = self.hashed_cursor_factory.clone();
scope.spawn(async move {
(
hashed_address,
StorageRoot::new_hashed(
trie_cursor_factory,
hashed_cursor_factory,
hashed_address,
)
.with_prefix_set(prefix_set)
.calculate(retain_updates),
)
});
}
});
let storage_roots =
storage_root_results.into_iter().collect::<Result<HashMap<_, _>, _>>().unwrap();
self.with_storage_roots(storage_roots).calculate(retain_updates)
}
}
/// StorageRoot is used to compute the root node of an account storage trie.
#[derive(Debug)]
pub struct StorageRoot<T, H> {
@@ -414,6 +491,13 @@ where
self,
retain_updates: bool,
) -> Result<(B256, usize, TrieUpdates), StorageRootError> {
// We assume we can always calculate a storage root without
// OOMing. This opens us up to a potential DOS vector if
// a contract had too many storage entries and they were
// all buffered w/o us returning and committing our intermediate
// progress.
// TODO: Consider introducing the TrieProgress::Progress/Complete inside StorageRoot.
trace!(target: "trie::storage_root", hashed_address = ?self.hashed_address, "calculating storage root");
let mut hashed_storage_cursor = self.hashed_cursor_factory.hashed_storage_cursor()?;
@@ -455,7 +539,7 @@ where
let (_, walker_updates) = storage_node_iter.walker.split();
let mut trie_updates = TrieUpdates::default();
trie_updates.extend(walker_updates.into_iter());
trie_updates.extend(walker_updates);
trie_updates.extend_with_storage_updates(self.hashed_address, hash_builder_updates);
trace!(target: "trie::storage_root", ?root, hashed_address = ?self.hashed_address, "calculated storage root");

View File

@@ -1,7 +1,8 @@
use super::{TrieCursor, TrieCursorFactory};
use crate::updates::TrieKey;
use crate::{database_provider::ConsistentDatabaseProvider, updates::TrieKey};
use reth_db::{
cursor::{DbCursorRO, DbDupCursorRO},
database::Database,
tables,
transaction::DbTx,
DatabaseError,
@@ -11,6 +12,24 @@ use reth_primitives::{
B256,
};
impl<DB: Database> TrieCursorFactory for ConsistentDatabaseProvider<DB> {
fn account_trie_cursor(&self) -> Result<Box<dyn TrieCursor + '_>, DatabaseError> {
Ok(Box::new(DatabaseAccountTrieCursor::new(
self.tx()?.cursor_read::<tables::AccountsTrie>()?,
)))
}
fn storage_tries_cursor(
&self,
hashed_address: B256,
) -> Result<Box<dyn TrieCursor + '_>, DatabaseError> {
Ok(Box::new(DatabaseStorageTrieCursor::new(
self.tx()?.cursor_dup_read::<tables::StoragesTrie>()?,
hashed_address,
)))
}
}
/// Implementation of the trie cursor factory for a database transaction.
impl<'a, TX: DbTx> TrieCursorFactory for &'a TX {
fn account_trie_cursor(&self) -> Result<Box<dyn TrieCursor + '_>, DatabaseError> {

View File

@@ -75,7 +75,7 @@ impl TrieUpdates {
}
/// Extend the updates with trie updates.
pub fn extend(&mut self, updates: impl Iterator<Item = (TrieKey, TrieOp)>) {
pub fn extend(&mut self, updates: impl IntoIterator<Item = (TrieKey, TrieOp)>) {
self.trie_operations.extend(updates);
}