mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
perf: hash state updates in parallel (#21836)
This commit is contained in:
@@ -517,6 +517,7 @@ where
|
||||
let max_storage_tries = self.sparse_trie_max_storage_tries;
|
||||
let chunk_size =
|
||||
config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size());
|
||||
let executor = self.executor.clone();
|
||||
|
||||
let parent_span = Span::current();
|
||||
self.executor.spawn_blocking(move || {
|
||||
@@ -560,6 +561,7 @@ where
|
||||
))
|
||||
} else {
|
||||
SpawnedSparseTrieTask::Cached(SparseTrieCacheTask::new_with_trie(
|
||||
&executor,
|
||||
from_multi_proof,
|
||||
proof_worker_handle,
|
||||
trie_metrics.clone(),
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
//! Sparse Trie task related functionality.
|
||||
|
||||
use super::executor::WorkloadExecutor;
|
||||
use crate::tree::{
|
||||
multiproof::{
|
||||
dispatch_with_chunking, evm_state_to_hashed_post_state, MultiProofMessage,
|
||||
@@ -12,7 +13,6 @@ use alloy_rlp::{Decodable, Encodable};
|
||||
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
|
||||
use rayon::iter::{IntoParallelRefMutIterator, ParallelIterator};
|
||||
use reth_primitives_traits::{Account, ParallelBridgeBuffered};
|
||||
use reth_revm::state::EvmState;
|
||||
use reth_trie::{
|
||||
proof_v2::Target, updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, Nibbles,
|
||||
TrieAccount, EMPTY_ROOT_HASH, TRIE_ACCOUNT_RLP_MAX_SIZE,
|
||||
@@ -218,7 +218,7 @@ pub(super) struct SparseTrieCacheTask<A = SerialSparseTrie, S = SerialSparseTrie
|
||||
/// Receiver for proof results directly from workers.
|
||||
proof_result_rx: CrossbeamReceiver<ProofResultMessage>,
|
||||
/// Receives updates from execution and prewarming.
|
||||
updates: CrossbeamReceiver<MultiProofMessage>,
|
||||
updates: CrossbeamReceiver<SparseTrieTaskMessage>,
|
||||
/// `SparseStateTrie` used for computing the state root.
|
||||
trie: SparseStateTrie<A, S>,
|
||||
/// Handle to the proof worker pools (storage and account).
|
||||
@@ -282,6 +282,7 @@ where
|
||||
{
|
||||
/// Creates a new sparse trie, pre-populating with an existing [`SparseStateTrie`].
|
||||
pub(super) fn new_with_trie(
|
||||
executor: &WorkloadExecutor,
|
||||
updates: CrossbeamReceiver<MultiProofMessage>,
|
||||
proof_worker_handle: ProofWorkerHandle,
|
||||
metrics: MultiProofTaskMetrics,
|
||||
@@ -289,10 +290,18 @@ where
|
||||
chunk_size: Option<usize>,
|
||||
) -> Self {
|
||||
let (proof_result_tx, proof_result_rx) = crossbeam_channel::unbounded();
|
||||
let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded();
|
||||
|
||||
let parent_span = tracing::Span::current();
|
||||
executor.spawn_blocking(move || {
|
||||
let _span = debug_span!(parent: parent_span, "run_hashing_task").entered();
|
||||
Self::run_hashing_task(updates, hashed_state_tx)
|
||||
});
|
||||
|
||||
Self {
|
||||
proof_result_tx,
|
||||
proof_result_rx,
|
||||
updates,
|
||||
updates: hashed_state_rx,
|
||||
proof_worker_handle,
|
||||
trie,
|
||||
chunk_size,
|
||||
@@ -312,6 +321,35 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Runs the hashing task that drains updates from the channel and converts them to
|
||||
/// `HashedPostState` in parallel.
|
||||
fn run_hashing_task(
|
||||
updates: CrossbeamReceiver<MultiProofMessage>,
|
||||
hashed_state_tx: CrossbeamSender<SparseTrieTaskMessage>,
|
||||
) {
|
||||
while let Ok(message) = updates.recv() {
|
||||
let msg = match message {
|
||||
MultiProofMessage::PrefetchProofs(targets) => {
|
||||
SparseTrieTaskMessage::PrefetchProofs(targets)
|
||||
}
|
||||
MultiProofMessage::StateUpdate(_, state) => {
|
||||
let _span = debug_span!(target: "engine::tree::payload_processor::sparse_trie", "hashing state update", update_len = state.len()).entered();
|
||||
let hashed = evm_state_to_hashed_post_state(state);
|
||||
SparseTrieTaskMessage::HashedState(hashed)
|
||||
}
|
||||
MultiProofMessage::FinishedStateUpdates => {
|
||||
SparseTrieTaskMessage::FinishedStateUpdates
|
||||
}
|
||||
MultiProofMessage::EmptyProof { .. } | MultiProofMessage::BlockAccessList(_) => {
|
||||
continue
|
||||
}
|
||||
};
|
||||
if hashed_state_tx.send(msg).is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Prunes and shrinks the trie for reuse in the next payload built on top of this one.
|
||||
///
|
||||
/// Should be called after the state root result has been sent.
|
||||
@@ -347,8 +385,8 @@ where
|
||||
|
||||
/// Runs the sparse trie task to completion.
|
||||
///
|
||||
/// This waits for new incoming [`MultiProofMessage`]s, applies updates to the trie and
|
||||
/// schedules proof fetching when needed.
|
||||
/// This waits for new incoming [`SparseTrieTaskMessage`]s, applies updates
|
||||
/// to the trie and schedules proof fetching when needed.
|
||||
///
|
||||
/// This concludes once the last state update has been received and processed.
|
||||
#[instrument(
|
||||
@@ -370,7 +408,7 @@ where
|
||||
}
|
||||
};
|
||||
|
||||
self.on_multiproof_message(update);
|
||||
self.on_message(update);
|
||||
self.pending_updates += 1;
|
||||
}
|
||||
recv(self.proof_result_rx) -> message => {
|
||||
@@ -432,14 +470,14 @@ where
|
||||
Ok(StateRootComputeOutcome { state_root, trie_updates })
|
||||
}
|
||||
|
||||
/// Processes a [`MultiProofMessage`].
|
||||
fn on_multiproof_message(&mut self, message: MultiProofMessage) {
|
||||
/// Processes a [`SparseTrieTaskMessage`] from the hashing task.
|
||||
fn on_message(&mut self, message: SparseTrieTaskMessage) {
|
||||
match message {
|
||||
MultiProofMessage::PrefetchProofs(targets) => self.on_prewarm_targets(targets),
|
||||
MultiProofMessage::StateUpdate(_, state) => self.on_state_update(state),
|
||||
MultiProofMessage::EmptyProof { .. } => unreachable!(),
|
||||
MultiProofMessage::BlockAccessList(_) => todo!(),
|
||||
MultiProofMessage::FinishedStateUpdates => self.finished_state_updates = true,
|
||||
SparseTrieTaskMessage::PrefetchProofs(targets) => self.on_prewarm_targets(targets),
|
||||
SparseTrieTaskMessage::HashedState(hashed_state) => {
|
||||
self.on_hashed_state_update(hashed_state)
|
||||
}
|
||||
SparseTrieTaskMessage::FinishedStateUpdates => self.finished_state_updates = true,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -474,16 +512,13 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Processes a state update and encodes all state changes as trie updates.
|
||||
/// Processes a hashed state update and encodes all state changes as trie updates.
|
||||
#[instrument(
|
||||
level = "debug",
|
||||
target = "engine::tree::payload_processor::sparse_trie",
|
||||
skip_all,
|
||||
fields(accounts = update.len())
|
||||
skip_all
|
||||
)]
|
||||
fn on_state_update(&mut self, update: EvmState) {
|
||||
let hashed_state_update = evm_state_to_hashed_post_state(update);
|
||||
|
||||
fn on_hashed_state_update(&mut self, hashed_state_update: HashedPostState) {
|
||||
for (address, storage) in hashed_state_update.storages {
|
||||
for (slot, value) in storage.storage {
|
||||
let encoded = if value.is_zero() {
|
||||
@@ -809,6 +844,16 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Message type for the sparse trie task.
|
||||
enum SparseTrieTaskMessage {
|
||||
/// A hashed state update ready to be processed.
|
||||
HashedState(HashedPostState),
|
||||
/// Prefetch proof targets (passed through directly).
|
||||
PrefetchProofs(VersionedMultiProofTargets),
|
||||
/// Signals that all state updates have been received.
|
||||
FinishedStateUpdates,
|
||||
}
|
||||
|
||||
/// Outcome of the state root computation, including the state root itself with
|
||||
/// the trie updates.
|
||||
#[derive(Debug)]
|
||||
|
||||
Reference in New Issue
Block a user