From a6a074210cee668ffb8639a7286757c48dd08a33 Mon Sep 17 00:00:00 2001 From: Arsenii Kulikov Date: Fri, 6 Feb 2026 00:32:07 +0400 Subject: [PATCH] perf: hash state updates in parallel (#21836) --- .../tree/src/tree/payload_processor/mod.rs | 2 + .../src/tree/payload_processor/sparse_trie.rs | 83 ++++++++++++++----- 2 files changed, 66 insertions(+), 19 deletions(-) diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index 6a9c0b03c2..b132c7faf8 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -517,6 +517,7 @@ where let max_storage_tries = self.sparse_trie_max_storage_tries; let chunk_size = config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()); + let executor = self.executor.clone(); let parent_span = Span::current(); self.executor.spawn_blocking(move || { @@ -560,6 +561,7 @@ where )) } else { SpawnedSparseTrieTask::Cached(SparseTrieCacheTask::new_with_trie( + &executor, from_multi_proof, proof_worker_handle, trie_metrics.clone(), diff --git a/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs b/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs index 905c2da435..b8421206b7 100644 --- a/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs +++ b/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs @@ -1,5 +1,6 @@ //! Sparse Trie task related functionality. +use super::executor::WorkloadExecutor; use crate::tree::{ multiproof::{ dispatch_with_chunking, evm_state_to_hashed_post_state, MultiProofMessage, @@ -12,7 +13,6 @@ use alloy_rlp::{Decodable, Encodable}; use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender}; use rayon::iter::{IntoParallelRefMutIterator, ParallelIterator}; use reth_primitives_traits::{Account, ParallelBridgeBuffered}; -use reth_revm::state::EvmState; use reth_trie::{ proof_v2::Target, updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, Nibbles, TrieAccount, EMPTY_ROOT_HASH, TRIE_ACCOUNT_RLP_MAX_SIZE, @@ -218,7 +218,7 @@ pub(super) struct SparseTrieCacheTask, /// Receives updates from execution and prewarming. - updates: CrossbeamReceiver, + updates: CrossbeamReceiver, /// `SparseStateTrie` used for computing the state root. trie: SparseStateTrie, /// Handle to the proof worker pools (storage and account). @@ -282,6 +282,7 @@ where { /// Creates a new sparse trie, pre-populating with an existing [`SparseStateTrie`]. pub(super) fn new_with_trie( + executor: &WorkloadExecutor, updates: CrossbeamReceiver, proof_worker_handle: ProofWorkerHandle, metrics: MultiProofTaskMetrics, @@ -289,10 +290,18 @@ where chunk_size: Option, ) -> Self { let (proof_result_tx, proof_result_rx) = crossbeam_channel::unbounded(); + let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded(); + + let parent_span = tracing::Span::current(); + executor.spawn_blocking(move || { + let _span = debug_span!(parent: parent_span, "run_hashing_task").entered(); + Self::run_hashing_task(updates, hashed_state_tx) + }); + Self { proof_result_tx, proof_result_rx, - updates, + updates: hashed_state_rx, proof_worker_handle, trie, chunk_size, @@ -312,6 +321,35 @@ where } } + /// Runs the hashing task that drains updates from the channel and converts them to + /// `HashedPostState` in parallel. + fn run_hashing_task( + updates: CrossbeamReceiver, + hashed_state_tx: CrossbeamSender, + ) { + while let Ok(message) = updates.recv() { + let msg = match message { + MultiProofMessage::PrefetchProofs(targets) => { + SparseTrieTaskMessage::PrefetchProofs(targets) + } + MultiProofMessage::StateUpdate(_, state) => { + let _span = debug_span!(target: "engine::tree::payload_processor::sparse_trie", "hashing state update", update_len = state.len()).entered(); + let hashed = evm_state_to_hashed_post_state(state); + SparseTrieTaskMessage::HashedState(hashed) + } + MultiProofMessage::FinishedStateUpdates => { + SparseTrieTaskMessage::FinishedStateUpdates + } + MultiProofMessage::EmptyProof { .. } | MultiProofMessage::BlockAccessList(_) => { + continue + } + }; + if hashed_state_tx.send(msg).is_err() { + break; + } + } + } + /// Prunes and shrinks the trie for reuse in the next payload built on top of this one. /// /// Should be called after the state root result has been sent. @@ -347,8 +385,8 @@ where /// Runs the sparse trie task to completion. /// - /// This waits for new incoming [`MultiProofMessage`]s, applies updates to the trie and - /// schedules proof fetching when needed. + /// This waits for new incoming [`SparseTrieTaskMessage`]s, applies updates + /// to the trie and schedules proof fetching when needed. /// /// This concludes once the last state update has been received and processed. #[instrument( @@ -370,7 +408,7 @@ where } }; - self.on_multiproof_message(update); + self.on_message(update); self.pending_updates += 1; } recv(self.proof_result_rx) -> message => { @@ -432,14 +470,14 @@ where Ok(StateRootComputeOutcome { state_root, trie_updates }) } - /// Processes a [`MultiProofMessage`]. - fn on_multiproof_message(&mut self, message: MultiProofMessage) { + /// Processes a [`SparseTrieTaskMessage`] from the hashing task. + fn on_message(&mut self, message: SparseTrieTaskMessage) { match message { - MultiProofMessage::PrefetchProofs(targets) => self.on_prewarm_targets(targets), - MultiProofMessage::StateUpdate(_, state) => self.on_state_update(state), - MultiProofMessage::EmptyProof { .. } => unreachable!(), - MultiProofMessage::BlockAccessList(_) => todo!(), - MultiProofMessage::FinishedStateUpdates => self.finished_state_updates = true, + SparseTrieTaskMessage::PrefetchProofs(targets) => self.on_prewarm_targets(targets), + SparseTrieTaskMessage::HashedState(hashed_state) => { + self.on_hashed_state_update(hashed_state) + } + SparseTrieTaskMessage::FinishedStateUpdates => self.finished_state_updates = true, } } @@ -474,16 +512,13 @@ where } } - /// Processes a state update and encodes all state changes as trie updates. + /// Processes a hashed state update and encodes all state changes as trie updates. #[instrument( level = "debug", target = "engine::tree::payload_processor::sparse_trie", - skip_all, - fields(accounts = update.len()) + skip_all )] - fn on_state_update(&mut self, update: EvmState) { - let hashed_state_update = evm_state_to_hashed_post_state(update); - + fn on_hashed_state_update(&mut self, hashed_state_update: HashedPostState) { for (address, storage) in hashed_state_update.storages { for (slot, value) in storage.storage { let encoded = if value.is_zero() { @@ -809,6 +844,16 @@ where } } +/// Message type for the sparse trie task. +enum SparseTrieTaskMessage { + /// A hashed state update ready to be processed. + HashedState(HashedPostState), + /// Prefetch proof targets (passed through directly). + PrefetchProofs(VersionedMultiProofTargets), + /// Signals that all state updates have been received. + FinishedStateUpdates, +} + /// Outcome of the state root computation, including the state root itself with /// the trie updates. #[derive(Debug)]