From f82d143d0c8720ea6cba4b7af55a7ad17aaa3dcb Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Fri, 27 Feb 2026 11:13:31 +0000 Subject: [PATCH] refactor(engine): PayloadProcessor::spawn_state_root (#22604) --- crates/engine/tree/benches/state_root_task.rs | 2 +- .../tree/src/tree/payload_processor/mod.rs | 160 +++++++++++++----- .../src/tree/payload_processor/multiproof.rs | 5 +- .../engine/tree/src/tree/payload_validator.rs | 6 +- 4 files changed, 122 insertions(+), 51 deletions(-) diff --git a/crates/engine/tree/benches/state_root_task.rs b/crates/engine/tree/benches/state_root_task.rs index 4820c74c0b..329b3118d1 100644 --- a/crates/engine/tree/benches/state_root_task.rs +++ b/crates/engine/tree/benches/state_root_task.rs @@ -250,7 +250,7 @@ fn bench_state_root(c: &mut Criterion) { None, ); - let mut state_hook = handle.state_hook(); + let mut state_hook = handle.state_hook().expect("state hook is None"); for (i, update) in state_updates.into_iter().enumerate() { state_hook.on_state(StateChangeSource::Transaction(i), &update); diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index d0dbbb3d05..f0164ca7e7 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -282,40 +282,19 @@ where self.spawn_tx_iterator(transactions, env.transaction_count); let span = Span::current(); - let (to_multi_proof, from_multi_proof) = crossbeam_channel::unbounded(); - let parent_state_root = env.parent_state_root; - let transaction_count = env.transaction_count; - let chunk_size = config.multiproof_chunk_size(); + let state_root_handle = self.spawn_state_root(multiproof_provider_factory, &env, config); let prewarm_handle = self.spawn_caching_with( env, prewarm_rx, provider_builder, - Some(to_multi_proof.clone()), + Some(state_root_handle.to_multi_proof.clone()), bal, ); - // Create and spawn the storage proof task. - let task_ctx = ProofTaskCtx::new(multiproof_provider_factory); - let halve_workers = transaction_count <= Self::SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD; - let proof_handle = ProofWorkerHandle::new(&self.executor, task_ctx, halve_workers); - - // wire the sparse trie to the state root response receiver - let (state_root_tx, state_root_rx) = channel(); - - // Spawn the sparse trie task using any stored trie and parallel trie configuration. - self.spawn_sparse_trie_task( - proof_handle, - state_root_tx, - from_multi_proof, - parent_state_root, - chunk_size, - ); - PayloadHandle { - to_multi_proof: Some(to_multi_proof), + state_root_handle: Some(state_root_handle), prewarm_handle, - state_root: Some(state_root_rx), transactions: execution_rx, _span: span, } @@ -339,14 +318,55 @@ where self.spawn_tx_iterator(transactions, env.transaction_count); let prewarm_handle = self.spawn_caching_with(env, prewarm_rx, provider_builder, None, bal); PayloadHandle { - to_multi_proof: None, + state_root_handle: None, prewarm_handle, - state_root: None, transactions: execution_rx, _span: Span::current(), } } + /// Spawns state root computation pipeline (multiproof + sparse trie tasks). + /// + /// The returned [`StateRootHandle`] provides: + /// - [`StateRootHandle::state_hook`] — an [`OnStateHook`] to stream state updates during + /// execution. + /// - [`StateRootHandle::state_root`] — blocks until the state root is computed and returns the + /// state root. + /// + /// The state hook **must** be dropped after execution to signal the end of state updates. + #[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)] + pub fn spawn_state_root( + &mut self, + multiproof_provider_factory: F, + env: &ExecutionEnv, + config: &TreeConfig, + ) -> StateRootHandle + where + F: DatabaseProviderROFactory + + Clone + + Send + + Sync + + 'static, + { + let (to_multi_proof, from_multi_proof) = crossbeam_channel::unbounded(); + + let task_ctx = ProofTaskCtx::new(multiproof_provider_factory); + let halve_workers = env.transaction_count <= Self::SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD; + let proof_handle = ProofWorkerHandle::new(&self.executor, task_ctx, halve_workers); + + let (state_root_tx, state_root_rx) = channel(); + + self.spawn_sparse_trie_task( + proof_handle, + state_root_tx, + from_multi_proof, + env.parent_state_root, + config.multiproof_chunk_size(), + ); + + StateRootHandle::new(to_multi_proof, state_root_rx) + } + /// Transaction count threshold below which proof workers are halved, since fewer transactions /// produce fewer state changes and most workers would be idle overhead. const SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD: usize = 30; @@ -725,20 +745,79 @@ fn convert_serial( } } +/// Handle to a background state root computation task. +/// +/// Unlike [`PayloadHandle`], this does not include transaction iteration or cache prewarming. +/// It only provides access to the state root computation via [`Self::state_hook`] and +/// [`Self::state_root`]. +/// +/// Created by [`PayloadProcessor::spawn_state_root`]. +#[derive(Debug)] +pub struct StateRootHandle { + /// Channel for evm state updates to the multiproof pipeline. + to_multi_proof: CrossbeamSender, + /// Receiver for the computed state root. + state_root_rx: Option>>, +} + +impl StateRootHandle { + /// Creates a new state root handle. + pub const fn new( + to_multi_proof: CrossbeamSender, + state_root_rx: mpsc::Receiver>, + ) -> Self { + Self { to_multi_proof, state_root_rx: Some(state_root_rx) } + } + + /// Returns a state hook that streams state updates to the background state root task. + /// + /// The hook must be dropped after execution completes to signal the end of state updates. + pub fn state_hook(&self) -> impl OnStateHook { + let to_multi_proof = StateHookSender::new(self.to_multi_proof.clone()); + + move |source: StateChangeSource, state: &EvmState| { + let _ = + to_multi_proof.send(MultiProofMessage::StateUpdate(source.into(), state.clone())); + } + } + + /// Awaits the state root computation result. + /// + /// # Panics + /// + /// If called more than once. + pub fn state_root(&mut self) -> Result { + self.state_root_rx + .take() + .expect("state_root already taken") + .recv() + .map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))? + } + + /// Takes the state root receiver for use with custom waiting logic (e.g., timeouts). + /// + /// # Panics + /// + /// If called more than once. + pub const fn take_state_root_rx( + &mut self, + ) -> mpsc::Receiver> { + self.state_root_rx.take().expect("state_root already taken") + } +} + /// Handle to all the spawned tasks. /// /// Generic over `R` (receipt type) to allow sharing `Arc>` with the /// caching task without cloning the expensive `BundleState`. #[derive(Debug)] pub struct PayloadHandle { - /// Channel for evm state updates - to_multi_proof: Option>, + /// Handle to the background state root computation, if spawned. + state_root_handle: Option, // must include the receiver of the state root wired to the sparse trie prewarm_handle: CacheTaskHandle, /// Stream of block transactions transactions: mpsc::Receiver>, - /// Receiver for the state root - state_root: Option>>, /// Span for tracing _span: Span, } @@ -756,11 +835,7 @@ impl PayloadHandle { skip_all )] pub fn state_root(&mut self) -> Result { - self.state_root - .take() - .expect("state_root is None") - .recv() - .map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))? + self.state_root_handle.as_mut().expect("state_root_handle is None").state_root() } /// Takes the state root receiver out of the handle for use with custom waiting logic @@ -772,21 +847,14 @@ impl PayloadHandle { pub const fn take_state_root_rx( &mut self, ) -> mpsc::Receiver> { - self.state_root.take().expect("state_root is None") + self.state_root_handle.as_mut().expect("state_root_handle is None").take_state_root_rx() } /// Returns a state hook to be used to send state updates to this task. /// /// If a multiproof task is spawned the hook will notify it about new states. - pub fn state_hook(&self) -> impl OnStateHook { - // convert the channel into a `StateHookSender` that emits an event on drop - let to_multi_proof = self.to_multi_proof.clone().map(StateHookSender::new); - - move |source: StateChangeSource, state: &EvmState| { - if let Some(sender) = &to_multi_proof { - let _ = sender.send(MultiProofMessage::StateUpdate(source.into(), state.clone())); - } - } + pub fn state_hook(&self) -> Option { + self.state_root_handle.as_ref().map(|handle| handle.state_hook()) } /// Returns a clone of the caches used by prewarming @@ -1364,7 +1432,7 @@ mod tests { None, // No BAL for test ); - let mut state_hook = handle.state_hook(); + let mut state_hook = handle.state_hook().expect("state hook is None"); for (i, update) in state_updates.into_iter().enumerate() { state_hook.on_state(StateChangeSource::Transaction(i), &update); diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs index 31695a2393..2bf78bb4b9 100644 --- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -77,10 +77,11 @@ pub enum MultiProofMessage { /// This should trigger once the block has been executed (after) the last state update has been /// sent. This triggers the exit condition of the multi proof task. #[derive(Deref, Debug)] -pub(super) struct StateHookSender(CrossbeamSender); +pub struct StateHookSender(CrossbeamSender); impl StateHookSender { - pub(crate) const fn new(inner: CrossbeamSender) -> Self { + /// Creates a new [`StateHookSender`] wrapping the given channel sender. + pub const fn new(inner: CrossbeamSender) -> Self { Self(inner) } } diff --git a/crates/engine/tree/src/tree/payload_validator.rs b/crates/engine/tree/src/tree/payload_validator.rs index 7416feffb8..b7fc27b241 100644 --- a/crates/engine/tree/src/tree/payload_validator.rs +++ b/crates/engine/tree/src/tree/payload_validator.rs @@ -27,7 +27,7 @@ use reth_engine_primitives::{ use reth_errors::{BlockExecutionError, ProviderResult}; use reth_evm::{ block::BlockExecutor, execute::ExecutableTxFor, ConfigureEvm, EvmEnvFor, ExecutionCtxFor, - SpecFor, + OnStateHook, SpecFor, }; use reth_payload_primitives::{ BuiltPayload, InvalidPayloadAttributesError, NewPayloadError, PayloadTypes, @@ -866,7 +866,9 @@ where .spawn_blocking_named("receipt-root", move || task_handle.run(receipts_len)); let transaction_count = input.transaction_count(); - let executor = executor.with_state_hook(Some(Box::new(handle.state_hook()))); + let executor = executor.with_state_hook( + handle.state_hook().map(|hook| Box::new(hook) as Box), + ); let execution_start = Instant::now();