Compare commits

...

1 Commits

Author SHA1 Message Date
Derek Cofausper
9d755825fd perf(engine): consolidate post-execution tasks into single PostExecHandle
Replaces the separate receipt-root task (ReceiptRootTaskHandle + oneshot),
hashed-post-state spawn, and transaction-root spawn with a unified
PostExecHandle that coordinates all three background tasks.

- Receipt root worker spawned at construction, receipts streamed incrementally
- Hashed post state and tx root spawned via finish() after execution
- Only aggregate bloom for valid receipt indices (correctness fix)
- Removes receipt_root_task.rs, adds post_exec.rs

Co-Authored-By: YK <46377366+yongkangc@users.noreply.github.com>
2026-03-05 01:28:27 +00:00
4 changed files with 528 additions and 348 deletions

View File

@@ -54,9 +54,9 @@ use tracing::{debug, debug_span, instrument, warn, Span};
pub mod bal;
pub mod multiproof;
pub mod post_exec;
mod preserved_sparse_trie;
pub mod prewarm;
pub mod receipt_root_task;
pub mod sparse_trie;
use preserved_sparse_trie::{PreservedSparseTrie, SharedPreservedSparseTrie};
@@ -175,6 +175,12 @@ where
disable_cache_metrics: config.disable_cache_metrics(),
}
}
/// Creates a new post-execution handle for a block, immediately spawning the
/// single event-driven post-exec background worker.
pub fn post_exec_handle(&self, receipts_len: usize) -> post_exec::PostExecHandle<N::Receipt> {
post_exec::PostExecHandle::new(&self.executor, receipts_len)
}
}
impl<Evm> WaitForCaches for PayloadProcessor<Evm>

View File

@@ -0,0 +1,483 @@
//! Per-block post-execution handle for background post-execution artifact computation.
//!
//! This module provides [`PostExecHandle`], a block-scoped facade that coordinates
//! background tasks:
//!
//! 1. **Receipt root worker** — spawned at construction via [`Runtime::spawn_blocking_named`].
//! Receipts are streamed incrementally during execution; when the channel closes the worker
//! finalizes the receipt trie root and aggregated bloom.
//!
//! 2. **Hashed post-state task** — spawned by [`PostExecHandle::finish`] so it starts immediately
//! after execution, running in parallel with receipt-root finalization.
//!
//! 3. **Transaction root task** — spawned by [`PostExecHandle::finish`] for payload blocks,
//! computing the transaction trie root in parallel with the other tasks.
//!
//! Results are accessed via blocking accessors that wait for the background tasks to complete.
use alloy_eips::Encodable2718;
use alloy_primitives::{Bloom, B256};
use crossbeam_channel::Sender as CrossbeamSender;
use reth_primitives_traits::Receipt;
use reth_tasks::{LazyHandle, Runtime};
use reth_trie::HashedPostState;
use reth_trie_common::ordered_root::OrderedTrieRootEncodedBuilder;
use std::sync::{Arc, OnceLock};
use tracing::error;
/// Receipt with index, ready to be sent to the background task for encoding and trie building.
#[derive(Debug, Clone)]
pub struct IndexedReceipt<R> {
/// The transaction index within the block.
pub index: usize,
/// The receipt.
pub receipt: R,
}
impl<R> IndexedReceipt<R> {
/// Creates a new indexed receipt.
#[inline]
pub const fn new(index: usize, receipt: R) -> Self {
Self { index, receipt }
}
}
/// Block-scoped handle for post-execution background tasks.
///
/// Created once per block via [`PostExecHandle::new`], which immediately spawns a
/// receipt-root background worker. During transaction execution, receipts are streamed
/// via [`push_receipt`](Self::push_receipt). After execution completes, call
/// [`finish`](Self::finish) to close the receipt channel and spawn hashed-post-state
/// and (optionally) transaction-root computation in parallel.
#[must_use]
pub struct PostExecHandle<R> {
tx: Option<CrossbeamSender<IndexedReceipt<R>>>,
receipt_root_bloom: Arc<OnceLock<Option<(B256, Bloom)>>>,
hashed_post_state: Option<LazyHandle<HashedPostState>>,
transaction_root: Option<LazyHandle<B256>>,
executor: Runtime,
}
impl<R> core::fmt::Debug for PostExecHandle<R> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("PostExecHandle").field("finished", &self.tx.is_none()).finish()
}
}
impl<R: Receipt + 'static> PostExecHandle<R> {
/// Creates a new handle and immediately spawns the receipt-root background worker.
///
/// The worker begins waiting for receipts via the crossbeam channel and builds the
/// receipt trie incrementally as they arrive. When the channel closes (via
/// [`finish`](Self::finish) or handle drop), the worker finalizes the receipt root.
pub fn new(executor: &Runtime, receipts_len: usize) -> Self {
let (tx, rx) = crossbeam_channel::unbounded();
let receipt_root_bloom = Arc::new(OnceLock::new());
let receipt_root_bloom_worker = receipt_root_bloom.clone();
// Use spawn_blocking_named for consistent thread naming; ignore the LazyHandle<()> return.
let _ = executor.spawn_blocking_named("receipt-root", move || {
run_receipt_root_worker(rx, receipt_root_bloom_worker, receipts_len);
});
Self {
tx: Some(tx),
receipt_root_bloom,
hashed_post_state: None,
transaction_root: None,
executor: executor.clone(),
}
}
/// Streams one receipt to the background worker.
#[inline]
pub fn push_receipt(&self, index: usize, receipt: R) {
if let Some(tx) = self.tx.as_ref() &&
tx.send(IndexedReceipt::new(index, receipt)).is_err()
{
error!(
target: "engine::tree::payload_processor",
index,
"receipt-root worker dropped before receipt event",
);
}
}
/// Closes the receipt channel and spawns hashed-post-state and (optionally)
/// transaction-root computation.
///
/// Dropping the channel sender signals the receipt-root worker to finalize.
/// The hashed-post-state and transaction-root closures are each spawned on
/// separate threads, running in parallel with receipt-root finalization.
///
/// Pass `None` for `tx_root_fn` when the block is not a payload (no tx root needed).
///
/// Must be called after all receipts have been pushed.
pub fn finish(
&mut self,
hashed_state_fn: impl FnOnce() -> HashedPostState + Send + 'static,
tx_root_fn: Option<impl FnOnce() -> B256 + Send + 'static>,
) {
// Drop receipt channel sender — signals worker to finalize receipt root.
self.tx.take();
// Spawn hashed-post-state computation immediately on a separate thread.
self.hashed_post_state =
Some(self.executor.spawn_blocking_named("hash-post-state", hashed_state_fn));
// Spawn transaction-root computation if this is a payload block.
self.transaction_root =
tx_root_fn.map(|f| self.executor.spawn_blocking_named("payload-tx-root", f));
}
/// Returns the computed receipt root and aggregated logs bloom.
///
/// Blocks until the receipt-root worker completes. Returns `None` if the receipt
/// stream was incomplete (e.g., execution was aborted).
pub fn receipt_root_bloom(&self) -> Option<(B256, Bloom)> {
*self.receipt_root_bloom.wait()
}
/// Returns the computed transaction root, if this was a payload block.
///
/// Blocks until the transaction-root task completes. Returns `None` for non-payload
/// blocks where tx root computation was not requested.
pub fn transaction_root(&self) -> Option<B256> {
self.transaction_root.as_ref().map(|h| *h.get())
}
/// Returns a reference to the computed hashed post state.
///
/// Blocks until the background task completes.
///
/// # Panics
///
/// Panics if [`finish`](Self::finish) was not called before this method.
pub fn hashed_post_state(&self) -> &HashedPostState {
self.hashed_post_state
.as_ref()
.expect("finish() must be called before hashed_post_state()")
.get()
}
/// Extracts the [`LazyHandle<HashedPostState>`] from this handle.
///
/// # Panics
///
/// Panics if [`finish`](Self::finish) was not called.
pub fn into_lazy_hashed_state(&mut self) -> LazyHandle<HashedPostState> {
self.hashed_post_state
.take()
.expect("finish() must be called before into_lazy_hashed_state()")
}
}
impl<R> Drop for PostExecHandle<R> {
fn drop(&mut self) {
// Drop the channel sender if finish() was never called, so the receipt-root
// worker can observe channel closure and terminate.
self.tx.take();
}
}
/// Runs the receipt-root background worker.
///
/// Receives indexed receipts from the channel, incrementally builds the receipt trie,
/// and aggregates the logs bloom. When the channel closes, it finalizes the root and
/// stores the result in the shared [`OnceLock`].
fn run_receipt_root_worker<R: Receipt>(
rx: crossbeam_channel::Receiver<IndexedReceipt<R>>,
receipt_root_bloom: Arc<OnceLock<Option<(B256, Bloom)>>>,
receipts_len: usize,
) {
// RAII guard ensures the OnceLock is set to None if we return early / panic.
struct AbortGuard<'a> {
lock: &'a OnceLock<Option<(B256, Bloom)>>,
disarmed: bool,
}
impl Drop for AbortGuard<'_> {
fn drop(&mut self) {
if !self.disarmed {
let _ = self.lock.set(None);
}
}
}
let mut guard = AbortGuard { lock: &receipt_root_bloom, disarmed: false };
let mut builder = OrderedTrieRootEncodedBuilder::new(receipts_len);
let mut aggregated_bloom = Bloom::ZERO;
let mut encode_buf = Vec::new();
let mut received_count = 0usize;
for indexed_receipt in &rx {
let receipt_with_bloom = indexed_receipt.receipt.with_bloom_ref();
encode_buf.clear();
receipt_with_bloom.encode_2718(&mut encode_buf);
match builder.push(indexed_receipt.index, &encode_buf) {
Ok(()) => {
received_count += 1;
aggregated_bloom |= *receipt_with_bloom.bloom_ref();
}
Err(err) => {
error!(
target: "engine::tree::payload_processor",
index = indexed_receipt.index,
?err,
"Receipt root worker received invalid receipt index, skipping"
);
}
}
}
// Finalize receipt root.
match builder.finalize() {
Ok(root) => {
let _ = receipt_root_bloom.set(Some((root, aggregated_bloom)));
}
Err(_) => {
error!(
target: "engine::tree::payload_processor",
expected = receipts_len,
received = received_count,
"Receipt-root worker received incomplete receipts, execution likely aborted"
);
let _ = receipt_root_bloom.set(None);
return;
}
}
guard.disarmed = true;
}
#[cfg(test)]
mod tests {
use super::*;
use alloy_consensus::{proofs::calculate_receipt_root, TxReceipt};
use alloy_primitives::{Address, Bytes, Log, B256};
use reth_ethereum_primitives::{Receipt, TxType};
fn test_runtime() -> Runtime {
Runtime::test()
}
fn sample_receipts() -> Vec<Receipt> {
vec![
Receipt {
tx_type: TxType::Legacy,
cumulative_gas_used: 21_000,
success: true,
logs: vec![],
},
Receipt {
tx_type: TxType::Eip1559,
cumulative_gas_used: 42_000,
success: true,
logs: vec![Log {
address: Address::ZERO,
data: alloy_primitives::LogData::new_unchecked(vec![B256::ZERO], Bytes::new()),
}],
},
Receipt {
tx_type: TxType::Eip2930,
cumulative_gas_used: 63_000,
success: false,
logs: vec![],
},
]
}
fn expected_root_bloom(receipts: &[Receipt]) -> (B256, Bloom) {
let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
let root = calculate_receipt_root(&receipts_with_bloom);
let bloom =
receipts_with_bloom.iter().fold(Bloom::ZERO, |acc, receipt| acc | *receipt.bloom_ref());
(root, bloom)
}
#[test]
fn post_exec_handle_computes_receipt_root_and_bloom() {
let rt = test_runtime();
let receipts = sample_receipts();
let (expected_root, expected_bloom) = expected_root_bloom(&receipts);
let mut handle = PostExecHandle::<Receipt>::new(&rt, receipts.len());
for (index, receipt) in receipts.into_iter().enumerate() {
handle.push_receipt(index, receipt);
}
handle.finish(HashedPostState::default, None::<fn() -> B256>);
let (root, bloom) = handle.receipt_root_bloom().unwrap();
assert_eq!(root, expected_root);
assert_eq!(bloom, expected_bloom);
}
#[test]
fn post_exec_handle_handles_out_of_order_receipts() {
let rt = test_runtime();
let receipts = sample_receipts();
let (expected_root, expected_bloom) = expected_root_bloom(&receipts);
let mut handle = PostExecHandle::<Receipt>::new(&rt, receipts.len());
for (index, receipt) in receipts.into_iter().enumerate().rev() {
handle.push_receipt(index, receipt);
}
handle.finish(HashedPostState::default, None::<fn() -> B256>);
let (root, bloom) = handle.receipt_root_bloom().unwrap();
assert_eq!(root, expected_root);
assert_eq!(bloom, expected_bloom);
}
#[test]
fn post_exec_handle_ignores_invalid_index_for_bloom_aggregation() {
let rt = test_runtime();
let valid = Receipt::default();
let invalid = Receipt {
tx_type: TxType::Legacy,
cumulative_gas_used: 21_000,
success: true,
logs: vec![Log {
address: Address::ZERO,
data: alloy_primitives::LogData::new_unchecked(vec![B256::ZERO], Bytes::new()),
}],
};
let expected = expected_root_bloom(core::slice::from_ref(&valid));
let mut handle = PostExecHandle::<Receipt>::new(&rt, 1);
handle.push_receipt(0, valid);
handle.push_receipt(999, invalid);
handle.finish(HashedPostState::default, None::<fn() -> B256>);
assert_eq!(handle.receipt_root_bloom(), Some(expected));
}
#[test]
fn post_exec_handle_returns_none_for_incomplete_stream() {
let rt = test_runtime();
let mut handle = PostExecHandle::<Receipt>::new(&rt, 2);
handle.push_receipt(0, Receipt::default());
// Finish with only 1 of 2 receipts — root should be None.
handle.finish(HashedPostState::default, None::<fn() -> B256>);
assert!(handle.receipt_root_bloom().is_none());
}
#[test]
fn post_exec_handle_with_hashed_post_state() {
let rt = test_runtime();
let mut handle = PostExecHandle::<Receipt>::new(&rt, 0);
let expected = HashedPostState::default();
handle.finish(HashedPostState::default, None::<fn() -> B256>);
assert_eq!(handle.hashed_post_state(), &expected);
}
#[test]
fn post_exec_handle_with_transaction_root() {
let rt = test_runtime();
let expected_root = B256::repeat_byte(0x42);
let mut handle = PostExecHandle::<Receipt>::new(&rt, 0);
handle.finish(HashedPostState::default, Some(move || expected_root));
assert_eq!(handle.transaction_root(), Some(expected_root));
}
#[test]
fn post_exec_handle_without_transaction_root() {
let rt = test_runtime();
let mut handle = PostExecHandle::<Receipt>::new(&rt, 0);
handle.finish(HashedPostState::default, None::<fn() -> B256>);
assert_eq!(handle.transaction_root(), None);
}
#[test]
fn post_exec_handle_parallel_blocks() {
let rt = test_runtime();
let receipts_a = sample_receipts();
let (expected_root_a, expected_bloom_a) = expected_root_bloom(&receipts_a);
let receipts_b = vec![Receipt::default(); 2];
let (expected_root_b, expected_bloom_b) = expected_root_bloom(&receipts_b);
let mut handle_a = PostExecHandle::<Receipt>::new(&rt, receipts_a.len());
let mut handle_b = PostExecHandle::<Receipt>::new(&rt, receipts_b.len());
for (index, receipt) in receipts_a.into_iter().enumerate() {
handle_a.push_receipt(index, receipt);
}
for (index, receipt) in receipts_b.into_iter().enumerate() {
handle_b.push_receipt(index, receipt);
}
handle_a.finish(HashedPostState::default, None::<fn() -> B256>);
handle_b.finish(HashedPostState::default, None::<fn() -> B256>);
let (root_a, bloom_a) = handle_a.receipt_root_bloom().unwrap();
let (root_b, bloom_b) = handle_b.receipt_root_bloom().unwrap();
assert_eq!(root_a, expected_root_a);
assert_eq!(bloom_a, expected_bloom_a);
assert_eq!(root_b, expected_root_b);
assert_eq!(bloom_b, expected_bloom_b);
}
#[test]
fn post_exec_handle_aborted_block_then_next_succeeds() {
let rt = test_runtime();
// First block: aborted (dropped without finishing all receipts)
let handle = PostExecHandle::<Receipt>::new(&rt, 2);
handle.push_receipt(0, Receipt::default());
drop(handle);
// Second block: succeeds
let mut handle = PostExecHandle::<Receipt>::new(&rt, 1);
handle.push_receipt(0, Receipt::default());
handle.finish(HashedPostState::default, None::<fn() -> B256>);
assert!(handle.receipt_root_bloom().is_some());
}
#[test]
fn lazy_hashed_post_state_get_and_try_into_inner() {
let rt = test_runtime();
let mut handle = PostExecHandle::<Receipt>::new(&rt, 0);
handle.finish(HashedPostState::default, None::<fn() -> B256>);
let lazy = handle.into_lazy_hashed_state();
// handle is partially consumed but Drop is safe (hashed_post_state is now None)
drop(handle);
assert_eq!(lazy.get(), &HashedPostState::default());
let inner = lazy.try_into_inner().unwrap();
assert_eq!(inner, HashedPostState::default());
}
#[test]
fn lazy_hashed_post_state_clone_prevents_try_into_inner() {
let rt = test_runtime();
let mut handle = PostExecHandle::<Receipt>::new(&rt, 0);
handle.finish(HashedPostState::default, None::<fn() -> B256>);
let lazy = handle.into_lazy_hashed_state();
drop(handle);
let _clone = lazy.clone();
// try_into_inner fails because there are multiple Arc references.
let lazy = lazy.try_into_inner().unwrap_err();
assert_eq!(lazy.get(), &HashedPostState::default());
}
}

View File

@@ -1,281 +0,0 @@
//! Receipt root computation in a background task.
//!
//! This module provides a streaming receipt root builder that computes the receipt trie root
//! in a background thread. Receipts are sent via a channel with their index, and for each
//! receipt received, the builder incrementally flushes leaves to the underlying
//! [`OrderedTrieRootEncodedBuilder`] when possible. When the channel closes, the task returns the
//! computed root.
use alloy_eips::Encodable2718;
use alloy_primitives::{Bloom, B256};
use crossbeam_channel::Receiver;
use reth_primitives_traits::Receipt;
use reth_trie_common::ordered_root::OrderedTrieRootEncodedBuilder;
use tokio::sync::oneshot;
use tracing::debug_span;
/// Receipt with index, ready to be sent to the background task for encoding and trie building.
#[derive(Debug, Clone)]
pub struct IndexedReceipt<R> {
/// The transaction index within the block.
pub index: usize,
/// The receipt.
pub receipt: R,
}
impl<R> IndexedReceipt<R> {
/// Creates a new indexed receipt.
#[inline]
pub const fn new(index: usize, receipt: R) -> Self {
Self { index, receipt }
}
}
/// Handle for running the receipt root computation in a background task.
///
/// This struct holds the channels needed to receive receipts and send the result.
/// Use [`Self::run`] to execute the computation (typically in a spawned blocking task).
#[derive(Debug)]
pub struct ReceiptRootTaskHandle<R> {
/// Receiver for indexed receipts.
receipt_rx: Receiver<IndexedReceipt<R>>,
/// Sender for the computed result.
result_tx: oneshot::Sender<(B256, Bloom)>,
}
impl<R: Receipt> ReceiptRootTaskHandle<R> {
/// Creates a new handle from the receipt receiver and result sender channels.
pub const fn new(
receipt_rx: Receiver<IndexedReceipt<R>>,
result_tx: oneshot::Sender<(B256, Bloom)>,
) -> Self {
Self { receipt_rx, result_tx }
}
/// Runs the receipt root computation, consuming the handle.
///
/// This method receives indexed receipts from the channel, encodes them,
/// and builds the trie incrementally. When all receipts have been received
/// (channel closed), it sends the result through the oneshot channel.
///
/// This is designed to be called inside a blocking task (e.g., via
/// `executor.spawn_blocking(move || handle.run(receipts_len))`).
///
/// # Arguments
///
/// * `receipts_len` - The total number of receipts expected. This is needed to correctly order
/// the trie keys according to RLP encoding rules.
pub fn run(self, receipts_len: usize) {
let _span = debug_span!(
target: "engine::tree::payload_processor",
"receipt_root",
receipts_len,
)
.entered();
let mut builder = OrderedTrieRootEncodedBuilder::new(receipts_len);
let mut aggregated_bloom = Bloom::ZERO;
let mut encode_buf = Vec::new();
let mut received_count = 0usize;
for indexed_receipt in self.receipt_rx {
let receipt_with_bloom = indexed_receipt.receipt.with_bloom_ref();
encode_buf.clear();
receipt_with_bloom.encode_2718(&mut encode_buf);
aggregated_bloom |= *receipt_with_bloom.bloom_ref();
match builder.push(indexed_receipt.index, &encode_buf) {
Ok(()) => {
received_count += 1;
}
Err(err) => {
// If a duplicate or out-of-bounds index is streamed, skip it and
// fall back to computing the receipt root from the full receipts
// vector later.
tracing::error!(
target: "engine::tree::payload_processor",
index = indexed_receipt.index,
?err,
"Receipt root task received invalid receipt index, skipping"
);
}
}
}
let Ok(root) = builder.finalize() else {
// Finalize fails if we didn't receive exactly `receipts_len` receipts. This can
// happen if execution was aborted early (e.g., invalid transaction encountered).
// We return without sending a result, allowing the caller to handle the abort.
tracing::error!(
target: "engine::tree::payload_processor",
expected = receipts_len,
received = received_count,
"Receipt root task received incomplete receipts, execution likely aborted"
);
return;
};
let _ = self.result_tx.send((root, aggregated_bloom));
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloy_consensus::{proofs::calculate_receipt_root, TxReceipt};
use alloy_primitives::{b256, hex, Address, Bytes, Log};
use crossbeam_channel::bounded;
use reth_ethereum_primitives::{Receipt, TxType};
#[tokio::test]
async fn test_receipt_root_task_empty() {
let (_tx, rx) = bounded::<IndexedReceipt<Receipt>>(1);
let (result_tx, result_rx) = oneshot::channel();
drop(_tx);
let handle = ReceiptRootTaskHandle::new(rx, result_tx);
tokio::task::spawn_blocking(move || handle.run(0)).await.unwrap();
let (root, bloom) = result_rx.await.unwrap();
// Empty trie root
assert_eq!(root, reth_trie_common::EMPTY_ROOT_HASH);
assert_eq!(bloom, Bloom::ZERO);
}
#[tokio::test]
async fn test_receipt_root_task_single_receipt() {
let receipts: Vec<Receipt> = vec![Receipt::default()];
let (tx, rx) = bounded(1);
let (result_tx, result_rx) = oneshot::channel();
let receipts_len = receipts.len();
let handle = ReceiptRootTaskHandle::new(rx, result_tx);
let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
for (i, receipt) in receipts.clone().into_iter().enumerate() {
tx.send(IndexedReceipt::new(i, receipt)).unwrap();
}
drop(tx);
join_handle.await.unwrap();
let (root, _bloom) = result_rx.await.unwrap();
// Verify against the standard calculation
let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
let expected_root = calculate_receipt_root(&receipts_with_bloom);
assert_eq!(root, expected_root);
}
#[tokio::test]
async fn test_receipt_root_task_multiple_receipts() {
let receipts: Vec<Receipt> = vec![Receipt::default(); 5];
let (tx, rx) = bounded(4);
let (result_tx, result_rx) = oneshot::channel();
let receipts_len = receipts.len();
let handle = ReceiptRootTaskHandle::new(rx, result_tx);
let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
for (i, receipt) in receipts.into_iter().enumerate() {
tx.send(IndexedReceipt::new(i, receipt)).unwrap();
}
drop(tx);
join_handle.await.unwrap();
let (root, bloom) = result_rx.await.unwrap();
// Verify against expected values from existing test
assert_eq!(
root,
b256!("0x61353b4fb714dc1fccacbf7eafc4273e62f3d1eed716fe41b2a0cd2e12c63ebc")
);
assert_eq!(
bloom,
Bloom::from(hex!("00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"))
);
}
#[tokio::test]
async fn test_receipt_root_matches_standard_calculation() {
// Create some receipts with actual data
let receipts = vec![
Receipt {
tx_type: TxType::Legacy,
cumulative_gas_used: 21000,
success: true,
logs: vec![],
},
Receipt {
tx_type: TxType::Eip1559,
cumulative_gas_used: 42000,
success: true,
logs: vec![Log {
address: Address::ZERO,
data: alloy_primitives::LogData::new_unchecked(vec![B256::ZERO], Bytes::new()),
}],
},
Receipt {
tx_type: TxType::Eip2930,
cumulative_gas_used: 63000,
success: false,
logs: vec![],
},
];
// Calculate expected values first (before we move receipts)
let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
let expected_root = calculate_receipt_root(&receipts_with_bloom);
let expected_bloom =
receipts_with_bloom.iter().fold(Bloom::ZERO, |bloom, r| bloom | r.bloom_ref());
// Calculate using the task
let (tx, rx) = bounded(4);
let (result_tx, result_rx) = oneshot::channel();
let receipts_len = receipts.len();
let handle = ReceiptRootTaskHandle::new(rx, result_tx);
let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
for (i, receipt) in receipts.into_iter().enumerate() {
tx.send(IndexedReceipt::new(i, receipt)).unwrap();
}
drop(tx);
join_handle.await.unwrap();
let (task_root, task_bloom) = result_rx.await.unwrap();
assert_eq!(task_root, expected_root);
assert_eq!(task_bloom, expected_bloom);
}
#[tokio::test]
async fn test_receipt_root_task_out_of_order() {
let receipts: Vec<Receipt> = vec![Receipt::default(); 5];
// Calculate expected values first (before we move receipts)
let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
let expected_root = calculate_receipt_root(&receipts_with_bloom);
let (tx, rx) = bounded(4);
let (result_tx, result_rx) = oneshot::channel();
let receipts_len = receipts.len();
let handle = ReceiptRootTaskHandle::new(rx, result_tx);
let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
// Send in reverse order to test out-of-order handling
for (i, receipt) in receipts.into_iter().enumerate().rev() {
tx.send(IndexedReceipt::new(i, receipt)).unwrap();
}
drop(tx);
join_handle.await.unwrap();
let (root, _bloom) = result_rx.await.unwrap();
assert_eq!(root, expected_root);
}
}

View File

@@ -18,7 +18,7 @@ use alloy_primitives::B256;
#[cfg(feature = "trie-debug")]
use reth_trie_sparse::debug_recorder::TrieDebugRecorder;
use crate::tree::payload_processor::receipt_root_task::{IndexedReceipt, ReceiptRootTaskHandle};
use crate::tree::payload_processor::post_exec::PostExecHandle;
use reth_chain_state::{CanonicalInMemoryState, DeferredTrieData, ExecutedBlock, LazyOverlay};
use reth_consensus::{ConsensusError, FullConsensus, ReceiptRootBloom};
use reth_engine_primitives::{
@@ -497,9 +497,9 @@ where
}
// Execute the block and handle any execution errors.
// The receipt root task is spawned before execution and receives receipts incrementally
// as transactions complete, allowing parallel computation during execution.
let (output, senders, receipt_root_rx) =
// The post-exec handle manages receipt root computation in a background worker,
// receiving receipts incrementally as transactions complete.
let (output, senders, mut post_exec) =
match self.execute_block(state_provider, env, &input, &mut handle) {
Ok(output) => output,
Err(err) => return self.handle_execution_error(input, err, &parent_block),
@@ -517,59 +517,40 @@ where
// needed. This frees up resources while state root computation continues.
let valid_block_tx = handle.terminate_caching(Some(output.clone()));
// Spawn hashed post state computation in background so it runs concurrently with
// block conversion and receipt root computation. This is a pure CPU-bound task
// (keccak256 hashing of all changed addresses and storage slots).
// Spawn hashed post state and (for payloads) transaction root on separate threads,
// in parallel with receipt-root finalization. Dropping the channel closes the receipt
// stream.
let hashed_state_output = output.clone();
let hashed_state_provider = self.provider.clone();
let hashed_state: LazyHashedPostState =
self.payload_processor.executor().spawn_blocking_named("hash-post-state", move || {
let block = convert_to_block(input)?;
let tx_root_fn = is_payload.then(|| {
let block = block.clone();
let parent_span = Span::current();
let num_hash = block.num_hash();
move || {
let _span =
debug_span!(target: "engine::tree::payload_validator", parent: parent_span, "payload_tx_root", block = ?num_hash)
.entered();
block.body().calculate_tx_root()
}
});
post_exec.finish(
move || {
let _span = debug_span!(
target: "engine::tree::payload_validator",
"hashed_post_state",
)
.entered();
hashed_state_provider.hashed_post_state(&hashed_state_output.state)
});
},
tx_root_fn,
);
let block = convert_to_block(input)?;
let transaction_root = is_payload.then(|| {
let block = block.clone();
let parent_span = Span::current();
let num_hash = block.num_hash();
self.payload_processor.executor().spawn_blocking_named("payload-tx-root", move || {
let _span =
debug_span!(target: "engine::tree::payload_validator", parent: parent_span, "payload_tx_root", block = ?num_hash)
.entered();
block.body().calculate_tx_root()
})
});
let block = block.with_senders(senders);
// Wait for the receipt root computation to complete.
let receipt_root_bloom = {
let _enter = debug_span!(
target: "engine::tree::payload_validator",
"wait_receipt_root",
)
.entered();
receipt_root_rx
.blocking_recv()
.inspect_err(|_| {
tracing::error!(
target: "engine::tree::payload_validator",
"Receipt root task dropped sender without result, receipt root calculation likely aborted"
);
})
.ok()
};
let transaction_root = transaction_root.map(|handle| {
let _span =
debug_span!(target: "engine::tree::payload_validator", "wait_payload_tx_root")
.entered();
handle.try_into_inner().expect("sole handle")
});
let receipt_root_bloom = post_exec.receipt_root_bloom();
let transaction_root = post_exec.transaction_root();
let hashed_state: LazyHashedPostState = post_exec.into_lazy_hashed_state();
let hashed_state = ensure_ok_post_block!(
self.validate_post_execution(
@@ -812,11 +793,7 @@ where
input: &BlockOrPayload<T>,
handle: &mut PayloadHandle<impl ExecutableTxFor<Evm>, Err, N::Receipt>,
) -> Result<
(
BlockExecutionOutput<N::Receipt>,
Vec<Address>,
tokio::sync::oneshot::Receiver<(B256, alloy_primitives::Bloom)>,
),
(BlockExecutionOutput<N::Receipt>, Vec<Address>, PostExecHandle<N::Receipt>),
InsertBlockErrorKind,
>
where
@@ -865,15 +842,10 @@ where
);
}
// Spawn background task to compute receipt root and logs bloom incrementally.
// Unbounded channel is used since tx count bounds capacity anyway (max ~30k txs per block).
// Create a unified post-exec handle that manages receipt root, hashed post state,
// and transaction root computation in parallel background tasks.
let receipts_len = input.transaction_count();
let (receipt_tx, receipt_rx) = crossbeam_channel::unbounded();
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
let task_handle = ReceiptRootTaskHandle::new(receipt_rx, result_tx);
self.payload_processor
.executor()
.spawn_blocking_named("receipt-root", move || task_handle.run(receipts_len));
let post_exec = self.payload_processor.post_exec_handle(receipts_len);
let transaction_count = input.transaction_count();
let executed_tx_index = Arc::clone(handle.executed_tx_index());
@@ -888,10 +860,9 @@ where
executor,
transaction_count,
handle.iter_transactions(),
&receipt_tx,
&post_exec,
&executed_tx_index,
)?;
drop(receipt_tx);
// Finish execution and get the result
let post_exec_start = Instant::now();
@@ -911,10 +882,10 @@ where
self.metrics.record_block_execution_gas_bucket(output.result.gas_used, execution_duration);
debug!(target: "engine::tree::payload_validator", elapsed = ?execution_duration, "Executed block");
Ok((output, senders, result_rx))
Ok((output, senders, post_exec))
}
/// Executes transactions and collects senders, streaming receipts to a background task.
/// Executes transactions and collects senders, streaming receipts to the post-exec worker.
///
/// This method handles:
/// - Applying pre-execution changes (e.g., beacon root updates)
@@ -928,7 +899,7 @@ where
mut executor: E,
transaction_count: usize,
transactions: impl Iterator<Item = Result<Tx, Err>>,
receipt_tx: &crossbeam_channel::Sender<IndexedReceipt<N::Receipt>>,
post_exec: &PostExecHandle<N::Receipt>,
executed_tx_index: &AtomicUsize,
) -> Result<(E, Vec<Address>), BlockExecutionError>
where
@@ -982,10 +953,11 @@ where
let current_len = executor.receipts().len();
if current_len > last_sent_len {
last_sent_len = current_len;
// Send the latest receipt to the background task for incremental root computation.
// Stream the latest receipt to the post-exec worker for incremental root
// computation.
if let Some(receipt) = executor.receipts().last() {
let tx_index = current_len - 1;
let _ = receipt_tx.send(IndexedReceipt::new(tx_index, receipt.clone()));
post_exec.push_receipt(tx_index, receipt.clone());
}
}
}