mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-08 03:01:12 -04:00
perf(engine): migrate to AsyncStateRoot (#10927)
Co-authored-by: Roman Krasiuk <rokrassyuk@gmail.com>
This commit is contained in:
@@ -333,6 +333,9 @@ pub enum InsertBlockErrorKindTwo {
|
||||
/// Provider error.
|
||||
#[error(transparent)]
|
||||
Provider(#[from] ProviderError),
|
||||
/// Other errors.
|
||||
#[error(transparent)]
|
||||
Other(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
|
||||
}
|
||||
|
||||
impl InsertBlockErrorKindTwo {
|
||||
@@ -365,6 +368,7 @@ impl InsertBlockErrorKindTwo {
|
||||
}
|
||||
}
|
||||
Self::Provider(err) => Err(InsertBlockFatalError::Provider(err)),
|
||||
Self::Other(err) => Err(InternalBlockExecutionError::Other(err).into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,7 +43,7 @@ use reth_rpc_types::{
|
||||
};
|
||||
use reth_stages_api::ControlFlow;
|
||||
use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput};
|
||||
use reth_trie_parallel::parallel_root::ParallelStateRoot;
|
||||
use reth_trie_parallel::async_root::{AsyncStateRoot, AsyncStateRootError};
|
||||
use std::{
|
||||
cmp::Ordering,
|
||||
collections::{btree_map, hash_map, BTreeMap, HashMap, HashSet, VecDeque},
|
||||
@@ -549,6 +549,7 @@ where
|
||||
config: TreeConfig,
|
||||
) -> Self {
|
||||
let (incoming_tx, incoming) = std::sync::mpsc::channel();
|
||||
|
||||
Self {
|
||||
provider,
|
||||
executor_provider,
|
||||
@@ -2193,14 +2194,14 @@ where
|
||||
let persistence_in_progress = self.persistence_state.in_progress();
|
||||
if !persistence_in_progress {
|
||||
state_root_result = match self
|
||||
.compute_state_root_in_parallel(block.parent_hash, &hashed_state)
|
||||
.compute_state_root_async(block.parent_hash, &hashed_state)
|
||||
{
|
||||
Ok((state_root, trie_output)) => Some((state_root, trie_output)),
|
||||
Err(ProviderError::ConsistentView(error)) => {
|
||||
debug!(target: "engine::tree", %error, "Parallel state root computation failed consistency check, falling back");
|
||||
Err(AsyncStateRootError::Provider(ProviderError::ConsistentView(error))) => {
|
||||
debug!(target: "engine", %error, "Async state root computation failed consistency check, falling back");
|
||||
None
|
||||
}
|
||||
Err(error) => return Err(error.into()),
|
||||
Err(error) => return Err(InsertBlockErrorKindTwo::Other(Box::new(error))),
|
||||
};
|
||||
}
|
||||
|
||||
@@ -2263,19 +2264,20 @@ where
|
||||
Ok(InsertPayloadOk2::Inserted(BlockStatus2::Valid))
|
||||
}
|
||||
|
||||
/// Compute state root for the given hashed post state in parallel.
|
||||
/// Compute state root for the given hashed post state asynchronously.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// Returns `Ok(_)` if computed successfully.
|
||||
/// Returns `Err(_)` if error was encountered during computation.
|
||||
/// `Err(ProviderError::ConsistentView(_))` can be safely ignored and fallback computation
|
||||
|
||||
/// should be used instead.
|
||||
fn compute_state_root_in_parallel(
|
||||
fn compute_state_root_async(
|
||||
&self,
|
||||
parent_hash: B256,
|
||||
hashed_state: &HashedPostState,
|
||||
) -> ProviderResult<(B256, TrieUpdates)> {
|
||||
) -> Result<(B256, TrieUpdates), AsyncStateRootError> {
|
||||
let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
|
||||
let mut input = TrieInput::default();
|
||||
|
||||
@@ -2297,7 +2299,7 @@ where
|
||||
// Extend with block we are validating root for.
|
||||
input.append_ref(hashed_state);
|
||||
|
||||
Ok(ParallelStateRoot::new(consistent_view, input).incremental_root_with_updates()?)
|
||||
AsyncStateRoot::new(consistent_view, input).incremental_root_with_updates()
|
||||
}
|
||||
|
||||
/// Handles an error that occurred while inserting a block.
|
||||
|
||||
@@ -73,7 +73,7 @@ pub fn calculate_state_root(c: &mut Criterion) {
|
||||
|
||||
// async root
|
||||
group.bench_function(BenchmarkId::new("async root", size), |b| {
|
||||
b.to_async(&runtime).iter_with_setup(
|
||||
b.iter_with_setup(
|
||||
|| AsyncStateRoot::new(view.clone(), TrieInput::from_state(updated_state.clone())),
|
||||
|calculator| calculator.incremental_root(),
|
||||
);
|
||||
|
||||
@@ -19,7 +19,6 @@ use reth_trie::{
|
||||
use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use thiserror::Error;
|
||||
use tokio::sync::oneshot;
|
||||
use tracing::*;
|
||||
|
||||
/// Async state root calculator.
|
||||
@@ -63,21 +62,16 @@ where
|
||||
Factory: DatabaseProviderFactory<Provider: BlockReader> + Clone + Send + Sync + 'static,
|
||||
{
|
||||
/// Calculate incremental state root asynchronously.
|
||||
pub async fn incremental_root(self) -> Result<B256, AsyncStateRootError> {
|
||||
self.calculate(false).await.map(|(root, _)| root)
|
||||
pub fn incremental_root(self) -> Result<B256, AsyncStateRootError> {
|
||||
self.calculate(false).map(|(root, _)| root)
|
||||
}
|
||||
|
||||
/// Calculate incremental state root with updates asynchronously.
|
||||
pub async fn incremental_root_with_updates(
|
||||
self,
|
||||
) -> Result<(B256, TrieUpdates), AsyncStateRootError> {
|
||||
self.calculate(true).await
|
||||
pub fn incremental_root_with_updates(self) -> Result<(B256, TrieUpdates), AsyncStateRootError> {
|
||||
self.calculate(true)
|
||||
}
|
||||
|
||||
async fn calculate(
|
||||
self,
|
||||
retain_updates: bool,
|
||||
) -> Result<(B256, TrieUpdates), AsyncStateRootError> {
|
||||
fn calculate(self, retain_updates: bool) -> Result<(B256, TrieUpdates), AsyncStateRootError> {
|
||||
let mut tracker = ParallelTrieTracker::default();
|
||||
let trie_nodes_sorted = Arc::new(self.input.nodes.into_sorted());
|
||||
let hashed_state_sorted = Arc::new(self.input.state.into_sorted());
|
||||
@@ -100,7 +94,7 @@ where
|
||||
#[cfg(feature = "metrics")]
|
||||
let metrics = self.metrics.storage_trie.clone();
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let (tx, rx) = std::sync::mpsc::sync_channel(1);
|
||||
|
||||
rayon::spawn_fifo(move || {
|
||||
let result = (|| -> Result<_, AsyncStateRootError> {
|
||||
@@ -160,7 +154,7 @@ where
|
||||
}
|
||||
TrieElement::Leaf(hashed_address, account) => {
|
||||
let (storage_root, _, updates) = match storage_roots.remove(&hashed_address) {
|
||||
Some(rx) => rx.await.map_err(|_| {
|
||||
Some(rx) => rx.recv().map_err(|_| {
|
||||
AsyncStateRootError::StorageRootChannelClosed { hashed_address }
|
||||
})??,
|
||||
// Since we do not store all intermediate nodes in the database, there might
|
||||
@@ -227,6 +221,9 @@ pub enum AsyncStateRootError {
|
||||
/// The hashed address for which channel was closed.
|
||||
hashed_address: B256,
|
||||
},
|
||||
/// Receive error
|
||||
#[error(transparent)]
|
||||
Receive(#[from] std::sync::mpsc::RecvError),
|
||||
/// Error while calculating storage root.
|
||||
#[error(transparent)]
|
||||
StorageRoot(#[from] StorageRootError),
|
||||
@@ -292,7 +289,6 @@ mod tests {
|
||||
assert_eq!(
|
||||
AsyncStateRoot::new(consistent_view.clone(), Default::default(),)
|
||||
.incremental_root()
|
||||
.await
|
||||
.unwrap(),
|
||||
test_utils::state_root(state.clone())
|
||||
);
|
||||
@@ -323,9 +319,8 @@ mod tests {
|
||||
}
|
||||
|
||||
assert_eq!(
|
||||
AsyncStateRoot::new(consistent_view.clone(), TrieInput::from_state(hashed_state))
|
||||
AsyncStateRoot::new(consistent_view, TrieInput::from_state(hashed_state))
|
||||
.incremental_root()
|
||||
.await
|
||||
.unwrap(),
|
||||
test_utils::state_root(state)
|
||||
);
|
||||
|
||||
Reference in New Issue
Block a user