refactor(engine): PayloadProcessor::spawn_state_root (#22604)

This commit is contained in:
Alexey Shekhirin
2026-02-27 11:13:31 +00:00
committed by GitHub
parent bebc532e0e
commit f82d143d0c
4 changed files with 122 additions and 51 deletions

View File

@@ -250,7 +250,7 @@ fn bench_state_root(c: &mut Criterion) {
None,
);
let mut state_hook = handle.state_hook();
let mut state_hook = handle.state_hook().expect("state hook is None");
for (i, update) in state_updates.into_iter().enumerate() {
state_hook.on_state(StateChangeSource::Transaction(i), &update);

View File

@@ -282,40 +282,19 @@ where
self.spawn_tx_iterator(transactions, env.transaction_count);
let span = Span::current();
let (to_multi_proof, from_multi_proof) = crossbeam_channel::unbounded();
let parent_state_root = env.parent_state_root;
let transaction_count = env.transaction_count;
let chunk_size = config.multiproof_chunk_size();
let state_root_handle = self.spawn_state_root(multiproof_provider_factory, &env, config);
let prewarm_handle = self.spawn_caching_with(
env,
prewarm_rx,
provider_builder,
Some(to_multi_proof.clone()),
Some(state_root_handle.to_multi_proof.clone()),
bal,
);
// Create and spawn the storage proof task.
let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
let halve_workers = transaction_count <= Self::SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD;
let proof_handle = ProofWorkerHandle::new(&self.executor, task_ctx, halve_workers);
// wire the sparse trie to the state root response receiver
let (state_root_tx, state_root_rx) = channel();
// Spawn the sparse trie task using any stored trie and parallel trie configuration.
self.spawn_sparse_trie_task(
proof_handle,
state_root_tx,
from_multi_proof,
parent_state_root,
chunk_size,
);
PayloadHandle {
to_multi_proof: Some(to_multi_proof),
state_root_handle: Some(state_root_handle),
prewarm_handle,
state_root: Some(state_root_rx),
transactions: execution_rx,
_span: span,
}
@@ -339,14 +318,55 @@ where
self.spawn_tx_iterator(transactions, env.transaction_count);
let prewarm_handle = self.spawn_caching_with(env, prewarm_rx, provider_builder, None, bal);
PayloadHandle {
to_multi_proof: None,
state_root_handle: None,
prewarm_handle,
state_root: None,
transactions: execution_rx,
_span: Span::current(),
}
}
/// Spawns state root computation pipeline (multiproof + sparse trie tasks).
///
/// The returned [`StateRootHandle`] provides:
/// - [`StateRootHandle::state_hook`] — an [`OnStateHook`] to stream state updates during
/// execution.
/// - [`StateRootHandle::state_root`] — blocks until the state root is computed and returns the
/// state root.
///
/// The state hook **must** be dropped after execution to signal the end of state updates.
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
pub fn spawn_state_root<F>(
&mut self,
multiproof_provider_factory: F,
env: &ExecutionEnv<Evm>,
config: &TreeConfig,
) -> StateRootHandle
where
F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
+ Clone
+ Send
+ Sync
+ 'static,
{
let (to_multi_proof, from_multi_proof) = crossbeam_channel::unbounded();
let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
let halve_workers = env.transaction_count <= Self::SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD;
let proof_handle = ProofWorkerHandle::new(&self.executor, task_ctx, halve_workers);
let (state_root_tx, state_root_rx) = channel();
self.spawn_sparse_trie_task(
proof_handle,
state_root_tx,
from_multi_proof,
env.parent_state_root,
config.multiproof_chunk_size(),
);
StateRootHandle::new(to_multi_proof, state_root_rx)
}
/// Transaction count threshold below which proof workers are halved, since fewer transactions
/// produce fewer state changes and most workers would be idle overhead.
const SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD: usize = 30;
@@ -725,20 +745,79 @@ fn convert_serial<RawTx, Tx, TxEnv, InnerTx, Recovered, Err, C>(
}
}
/// Handle to a background state root computation task.
///
/// Unlike [`PayloadHandle`], this does not include transaction iteration or cache prewarming.
/// It only provides access to the state root computation via [`Self::state_hook`] and
/// [`Self::state_root`].
///
/// Created by [`PayloadProcessor::spawn_state_root`].
#[derive(Debug)]
pub struct StateRootHandle {
/// Channel for evm state updates to the multiproof pipeline.
to_multi_proof: CrossbeamSender<MultiProofMessage>,
/// Receiver for the computed state root.
state_root_rx: Option<mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>>,
}
impl StateRootHandle {
/// Creates a new state root handle.
pub const fn new(
to_multi_proof: CrossbeamSender<MultiProofMessage>,
state_root_rx: mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>,
) -> Self {
Self { to_multi_proof, state_root_rx: Some(state_root_rx) }
}
/// Returns a state hook that streams state updates to the background state root task.
///
/// The hook must be dropped after execution completes to signal the end of state updates.
pub fn state_hook(&self) -> impl OnStateHook {
let to_multi_proof = StateHookSender::new(self.to_multi_proof.clone());
move |source: StateChangeSource, state: &EvmState| {
let _ =
to_multi_proof.send(MultiProofMessage::StateUpdate(source.into(), state.clone()));
}
}
/// Awaits the state root computation result.
///
/// # Panics
///
/// If called more than once.
pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
self.state_root_rx
.take()
.expect("state_root already taken")
.recv()
.map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
}
/// Takes the state root receiver for use with custom waiting logic (e.g., timeouts).
///
/// # Panics
///
/// If called more than once.
pub const fn take_state_root_rx(
&mut self,
) -> mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>> {
self.state_root_rx.take().expect("state_root already taken")
}
}
/// Handle to all the spawned tasks.
///
/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the
/// caching task without cloning the expensive `BundleState`.
#[derive(Debug)]
pub struct PayloadHandle<Tx, Err, R> {
/// Channel for evm state updates
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
/// Handle to the background state root computation, if spawned.
state_root_handle: Option<StateRootHandle>,
// must include the receiver of the state root wired to the sparse trie
prewarm_handle: CacheTaskHandle<R>,
/// Stream of block transactions
transactions: mpsc::Receiver<Result<Tx, Err>>,
/// Receiver for the state root
state_root: Option<mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>>,
/// Span for tracing
_span: Span,
}
@@ -756,11 +835,7 @@ impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
skip_all
)]
pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
self.state_root
.take()
.expect("state_root is None")
.recv()
.map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
self.state_root_handle.as_mut().expect("state_root_handle is None").state_root()
}
/// Takes the state root receiver out of the handle for use with custom waiting logic
@@ -772,21 +847,14 @@ impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
pub const fn take_state_root_rx(
&mut self,
) -> mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>> {
self.state_root.take().expect("state_root is None")
self.state_root_handle.as_mut().expect("state_root_handle is None").take_state_root_rx()
}
/// Returns a state hook to be used to send state updates to this task.
///
/// If a multiproof task is spawned the hook will notify it about new states.
pub fn state_hook(&self) -> impl OnStateHook {
// convert the channel into a `StateHookSender` that emits an event on drop
let to_multi_proof = self.to_multi_proof.clone().map(StateHookSender::new);
move |source: StateChangeSource, state: &EvmState| {
if let Some(sender) = &to_multi_proof {
let _ = sender.send(MultiProofMessage::StateUpdate(source.into(), state.clone()));
}
}
pub fn state_hook(&self) -> Option<impl OnStateHook> {
self.state_root_handle.as_ref().map(|handle| handle.state_hook())
}
/// Returns a clone of the caches used by prewarming
@@ -1364,7 +1432,7 @@ mod tests {
None, // No BAL for test
);
let mut state_hook = handle.state_hook();
let mut state_hook = handle.state_hook().expect("state hook is None");
for (i, update) in state_updates.into_iter().enumerate() {
state_hook.on_state(StateChangeSource::Transaction(i), &update);

View File

@@ -77,10 +77,11 @@ pub enum MultiProofMessage {
/// This should trigger once the block has been executed (after) the last state update has been
/// sent. This triggers the exit condition of the multi proof task.
#[derive(Deref, Debug)]
pub(super) struct StateHookSender(CrossbeamSender<MultiProofMessage>);
pub struct StateHookSender(CrossbeamSender<MultiProofMessage>);
impl StateHookSender {
pub(crate) const fn new(inner: CrossbeamSender<MultiProofMessage>) -> Self {
/// Creates a new [`StateHookSender`] wrapping the given channel sender.
pub const fn new(inner: CrossbeamSender<MultiProofMessage>) -> Self {
Self(inner)
}
}

View File

@@ -27,7 +27,7 @@ use reth_engine_primitives::{
use reth_errors::{BlockExecutionError, ProviderResult};
use reth_evm::{
block::BlockExecutor, execute::ExecutableTxFor, ConfigureEvm, EvmEnvFor, ExecutionCtxFor,
SpecFor,
OnStateHook, SpecFor,
};
use reth_payload_primitives::{
BuiltPayload, InvalidPayloadAttributesError, NewPayloadError, PayloadTypes,
@@ -866,7 +866,9 @@ where
.spawn_blocking_named("receipt-root", move || task_handle.run(receipts_len));
let transaction_count = input.transaction_count();
let executor = executor.with_state_hook(Some(Box::new(handle.state_hook())));
let executor = executor.with_state_hook(
handle.state_hook().map(|hook| Box::new(hook) as Box<dyn OnStateHook>),
);
let execution_start = Instant::now();