mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
1 Commits
main
...
yk/post-ex
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9d755825fd |
@@ -54,9 +54,9 @@ use tracing::{debug, debug_span, instrument, warn, Span};
|
||||
|
||||
pub mod bal;
|
||||
pub mod multiproof;
|
||||
pub mod post_exec;
|
||||
mod preserved_sparse_trie;
|
||||
pub mod prewarm;
|
||||
pub mod receipt_root_task;
|
||||
pub mod sparse_trie;
|
||||
|
||||
use preserved_sparse_trie::{PreservedSparseTrie, SharedPreservedSparseTrie};
|
||||
@@ -175,6 +175,12 @@ where
|
||||
disable_cache_metrics: config.disable_cache_metrics(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new post-execution handle for a block, immediately spawning the
|
||||
/// single event-driven post-exec background worker.
|
||||
pub fn post_exec_handle(&self, receipts_len: usize) -> post_exec::PostExecHandle<N::Receipt> {
|
||||
post_exec::PostExecHandle::new(&self.executor, receipts_len)
|
||||
}
|
||||
}
|
||||
|
||||
impl<Evm> WaitForCaches for PayloadProcessor<Evm>
|
||||
|
||||
483
crates/engine/tree/src/tree/payload_processor/post_exec.rs
Normal file
483
crates/engine/tree/src/tree/payload_processor/post_exec.rs
Normal file
@@ -0,0 +1,483 @@
|
||||
//! Per-block post-execution handle for background post-execution artifact computation.
|
||||
//!
|
||||
//! This module provides [`PostExecHandle`], a block-scoped facade that coordinates
|
||||
//! background tasks:
|
||||
//!
|
||||
//! 1. **Receipt root worker** — spawned at construction via [`Runtime::spawn_blocking_named`].
|
||||
//! Receipts are streamed incrementally during execution; when the channel closes the worker
|
||||
//! finalizes the receipt trie root and aggregated bloom.
|
||||
//!
|
||||
//! 2. **Hashed post-state task** — spawned by [`PostExecHandle::finish`] so it starts immediately
|
||||
//! after execution, running in parallel with receipt-root finalization.
|
||||
//!
|
||||
//! 3. **Transaction root task** — spawned by [`PostExecHandle::finish`] for payload blocks,
|
||||
//! computing the transaction trie root in parallel with the other tasks.
|
||||
//!
|
||||
//! Results are accessed via blocking accessors that wait for the background tasks to complete.
|
||||
|
||||
use alloy_eips::Encodable2718;
|
||||
use alloy_primitives::{Bloom, B256};
|
||||
use crossbeam_channel::Sender as CrossbeamSender;
|
||||
use reth_primitives_traits::Receipt;
|
||||
use reth_tasks::{LazyHandle, Runtime};
|
||||
use reth_trie::HashedPostState;
|
||||
use reth_trie_common::ordered_root::OrderedTrieRootEncodedBuilder;
|
||||
use std::sync::{Arc, OnceLock};
|
||||
use tracing::error;
|
||||
|
||||
/// Receipt with index, ready to be sent to the background task for encoding and trie building.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct IndexedReceipt<R> {
|
||||
/// The transaction index within the block.
|
||||
pub index: usize,
|
||||
/// The receipt.
|
||||
pub receipt: R,
|
||||
}
|
||||
|
||||
impl<R> IndexedReceipt<R> {
|
||||
/// Creates a new indexed receipt.
|
||||
#[inline]
|
||||
pub const fn new(index: usize, receipt: R) -> Self {
|
||||
Self { index, receipt }
|
||||
}
|
||||
}
|
||||
|
||||
/// Block-scoped handle for post-execution background tasks.
|
||||
///
|
||||
/// Created once per block via [`PostExecHandle::new`], which immediately spawns a
|
||||
/// receipt-root background worker. During transaction execution, receipts are streamed
|
||||
/// via [`push_receipt`](Self::push_receipt). After execution completes, call
|
||||
/// [`finish`](Self::finish) to close the receipt channel and spawn hashed-post-state
|
||||
/// and (optionally) transaction-root computation in parallel.
|
||||
#[must_use]
|
||||
pub struct PostExecHandle<R> {
|
||||
tx: Option<CrossbeamSender<IndexedReceipt<R>>>,
|
||||
receipt_root_bloom: Arc<OnceLock<Option<(B256, Bloom)>>>,
|
||||
hashed_post_state: Option<LazyHandle<HashedPostState>>,
|
||||
transaction_root: Option<LazyHandle<B256>>,
|
||||
executor: Runtime,
|
||||
}
|
||||
|
||||
impl<R> core::fmt::Debug for PostExecHandle<R> {
|
||||
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
|
||||
f.debug_struct("PostExecHandle").field("finished", &self.tx.is_none()).finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: Receipt + 'static> PostExecHandle<R> {
|
||||
/// Creates a new handle and immediately spawns the receipt-root background worker.
|
||||
///
|
||||
/// The worker begins waiting for receipts via the crossbeam channel and builds the
|
||||
/// receipt trie incrementally as they arrive. When the channel closes (via
|
||||
/// [`finish`](Self::finish) or handle drop), the worker finalizes the receipt root.
|
||||
pub fn new(executor: &Runtime, receipts_len: usize) -> Self {
|
||||
let (tx, rx) = crossbeam_channel::unbounded();
|
||||
let receipt_root_bloom = Arc::new(OnceLock::new());
|
||||
let receipt_root_bloom_worker = receipt_root_bloom.clone();
|
||||
|
||||
// Use spawn_blocking_named for consistent thread naming; ignore the LazyHandle<()> return.
|
||||
let _ = executor.spawn_blocking_named("receipt-root", move || {
|
||||
run_receipt_root_worker(rx, receipt_root_bloom_worker, receipts_len);
|
||||
});
|
||||
|
||||
Self {
|
||||
tx: Some(tx),
|
||||
receipt_root_bloom,
|
||||
hashed_post_state: None,
|
||||
transaction_root: None,
|
||||
executor: executor.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Streams one receipt to the background worker.
|
||||
#[inline]
|
||||
pub fn push_receipt(&self, index: usize, receipt: R) {
|
||||
if let Some(tx) = self.tx.as_ref() &&
|
||||
tx.send(IndexedReceipt::new(index, receipt)).is_err()
|
||||
{
|
||||
error!(
|
||||
target: "engine::tree::payload_processor",
|
||||
index,
|
||||
"receipt-root worker dropped before receipt event",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Closes the receipt channel and spawns hashed-post-state and (optionally)
|
||||
/// transaction-root computation.
|
||||
///
|
||||
/// Dropping the channel sender signals the receipt-root worker to finalize.
|
||||
/// The hashed-post-state and transaction-root closures are each spawned on
|
||||
/// separate threads, running in parallel with receipt-root finalization.
|
||||
///
|
||||
/// Pass `None` for `tx_root_fn` when the block is not a payload (no tx root needed).
|
||||
///
|
||||
/// Must be called after all receipts have been pushed.
|
||||
pub fn finish(
|
||||
&mut self,
|
||||
hashed_state_fn: impl FnOnce() -> HashedPostState + Send + 'static,
|
||||
tx_root_fn: Option<impl FnOnce() -> B256 + Send + 'static>,
|
||||
) {
|
||||
// Drop receipt channel sender — signals worker to finalize receipt root.
|
||||
self.tx.take();
|
||||
|
||||
// Spawn hashed-post-state computation immediately on a separate thread.
|
||||
self.hashed_post_state =
|
||||
Some(self.executor.spawn_blocking_named("hash-post-state", hashed_state_fn));
|
||||
|
||||
// Spawn transaction-root computation if this is a payload block.
|
||||
self.transaction_root =
|
||||
tx_root_fn.map(|f| self.executor.spawn_blocking_named("payload-tx-root", f));
|
||||
}
|
||||
|
||||
/// Returns the computed receipt root and aggregated logs bloom.
|
||||
///
|
||||
/// Blocks until the receipt-root worker completes. Returns `None` if the receipt
|
||||
/// stream was incomplete (e.g., execution was aborted).
|
||||
pub fn receipt_root_bloom(&self) -> Option<(B256, Bloom)> {
|
||||
*self.receipt_root_bloom.wait()
|
||||
}
|
||||
|
||||
/// Returns the computed transaction root, if this was a payload block.
|
||||
///
|
||||
/// Blocks until the transaction-root task completes. Returns `None` for non-payload
|
||||
/// blocks where tx root computation was not requested.
|
||||
pub fn transaction_root(&self) -> Option<B256> {
|
||||
self.transaction_root.as_ref().map(|h| *h.get())
|
||||
}
|
||||
|
||||
/// Returns a reference to the computed hashed post state.
|
||||
///
|
||||
/// Blocks until the background task completes.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if [`finish`](Self::finish) was not called before this method.
|
||||
pub fn hashed_post_state(&self) -> &HashedPostState {
|
||||
self.hashed_post_state
|
||||
.as_ref()
|
||||
.expect("finish() must be called before hashed_post_state()")
|
||||
.get()
|
||||
}
|
||||
|
||||
/// Extracts the [`LazyHandle<HashedPostState>`] from this handle.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if [`finish`](Self::finish) was not called.
|
||||
pub fn into_lazy_hashed_state(&mut self) -> LazyHandle<HashedPostState> {
|
||||
self.hashed_post_state
|
||||
.take()
|
||||
.expect("finish() must be called before into_lazy_hashed_state()")
|
||||
}
|
||||
}
|
||||
|
||||
impl<R> Drop for PostExecHandle<R> {
|
||||
fn drop(&mut self) {
|
||||
// Drop the channel sender if finish() was never called, so the receipt-root
|
||||
// worker can observe channel closure and terminate.
|
||||
self.tx.take();
|
||||
}
|
||||
}
|
||||
|
||||
/// Runs the receipt-root background worker.
|
||||
///
|
||||
/// Receives indexed receipts from the channel, incrementally builds the receipt trie,
|
||||
/// and aggregates the logs bloom. When the channel closes, it finalizes the root and
|
||||
/// stores the result in the shared [`OnceLock`].
|
||||
fn run_receipt_root_worker<R: Receipt>(
|
||||
rx: crossbeam_channel::Receiver<IndexedReceipt<R>>,
|
||||
receipt_root_bloom: Arc<OnceLock<Option<(B256, Bloom)>>>,
|
||||
receipts_len: usize,
|
||||
) {
|
||||
// RAII guard ensures the OnceLock is set to None if we return early / panic.
|
||||
struct AbortGuard<'a> {
|
||||
lock: &'a OnceLock<Option<(B256, Bloom)>>,
|
||||
disarmed: bool,
|
||||
}
|
||||
impl Drop for AbortGuard<'_> {
|
||||
fn drop(&mut self) {
|
||||
if !self.disarmed {
|
||||
let _ = self.lock.set(None);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut guard = AbortGuard { lock: &receipt_root_bloom, disarmed: false };
|
||||
let mut builder = OrderedTrieRootEncodedBuilder::new(receipts_len);
|
||||
let mut aggregated_bloom = Bloom::ZERO;
|
||||
let mut encode_buf = Vec::new();
|
||||
let mut received_count = 0usize;
|
||||
|
||||
for indexed_receipt in &rx {
|
||||
let receipt_with_bloom = indexed_receipt.receipt.with_bloom_ref();
|
||||
|
||||
encode_buf.clear();
|
||||
receipt_with_bloom.encode_2718(&mut encode_buf);
|
||||
|
||||
match builder.push(indexed_receipt.index, &encode_buf) {
|
||||
Ok(()) => {
|
||||
received_count += 1;
|
||||
aggregated_bloom |= *receipt_with_bloom.bloom_ref();
|
||||
}
|
||||
Err(err) => {
|
||||
error!(
|
||||
target: "engine::tree::payload_processor",
|
||||
index = indexed_receipt.index,
|
||||
?err,
|
||||
"Receipt root worker received invalid receipt index, skipping"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Finalize receipt root.
|
||||
match builder.finalize() {
|
||||
Ok(root) => {
|
||||
let _ = receipt_root_bloom.set(Some((root, aggregated_bloom)));
|
||||
}
|
||||
Err(_) => {
|
||||
error!(
|
||||
target: "engine::tree::payload_processor",
|
||||
expected = receipts_len,
|
||||
received = received_count,
|
||||
"Receipt-root worker received incomplete receipts, execution likely aborted"
|
||||
);
|
||||
let _ = receipt_root_bloom.set(None);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
guard.disarmed = true;
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use alloy_consensus::{proofs::calculate_receipt_root, TxReceipt};
|
||||
use alloy_primitives::{Address, Bytes, Log, B256};
|
||||
use reth_ethereum_primitives::{Receipt, TxType};
|
||||
|
||||
fn test_runtime() -> Runtime {
|
||||
Runtime::test()
|
||||
}
|
||||
|
||||
fn sample_receipts() -> Vec<Receipt> {
|
||||
vec![
|
||||
Receipt {
|
||||
tx_type: TxType::Legacy,
|
||||
cumulative_gas_used: 21_000,
|
||||
success: true,
|
||||
logs: vec![],
|
||||
},
|
||||
Receipt {
|
||||
tx_type: TxType::Eip1559,
|
||||
cumulative_gas_used: 42_000,
|
||||
success: true,
|
||||
logs: vec![Log {
|
||||
address: Address::ZERO,
|
||||
data: alloy_primitives::LogData::new_unchecked(vec![B256::ZERO], Bytes::new()),
|
||||
}],
|
||||
},
|
||||
Receipt {
|
||||
tx_type: TxType::Eip2930,
|
||||
cumulative_gas_used: 63_000,
|
||||
success: false,
|
||||
logs: vec![],
|
||||
},
|
||||
]
|
||||
}
|
||||
|
||||
fn expected_root_bloom(receipts: &[Receipt]) -> (B256, Bloom) {
|
||||
let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
|
||||
let root = calculate_receipt_root(&receipts_with_bloom);
|
||||
let bloom =
|
||||
receipts_with_bloom.iter().fold(Bloom::ZERO, |acc, receipt| acc | *receipt.bloom_ref());
|
||||
(root, bloom)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn post_exec_handle_computes_receipt_root_and_bloom() {
|
||||
let rt = test_runtime();
|
||||
|
||||
let receipts = sample_receipts();
|
||||
let (expected_root, expected_bloom) = expected_root_bloom(&receipts);
|
||||
|
||||
let mut handle = PostExecHandle::<Receipt>::new(&rt, receipts.len());
|
||||
for (index, receipt) in receipts.into_iter().enumerate() {
|
||||
handle.push_receipt(index, receipt);
|
||||
}
|
||||
handle.finish(HashedPostState::default, None::<fn() -> B256>);
|
||||
|
||||
let (root, bloom) = handle.receipt_root_bloom().unwrap();
|
||||
assert_eq!(root, expected_root);
|
||||
assert_eq!(bloom, expected_bloom);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn post_exec_handle_handles_out_of_order_receipts() {
|
||||
let rt = test_runtime();
|
||||
|
||||
let receipts = sample_receipts();
|
||||
let (expected_root, expected_bloom) = expected_root_bloom(&receipts);
|
||||
|
||||
let mut handle = PostExecHandle::<Receipt>::new(&rt, receipts.len());
|
||||
for (index, receipt) in receipts.into_iter().enumerate().rev() {
|
||||
handle.push_receipt(index, receipt);
|
||||
}
|
||||
handle.finish(HashedPostState::default, None::<fn() -> B256>);
|
||||
|
||||
let (root, bloom) = handle.receipt_root_bloom().unwrap();
|
||||
assert_eq!(root, expected_root);
|
||||
assert_eq!(bloom, expected_bloom);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn post_exec_handle_ignores_invalid_index_for_bloom_aggregation() {
|
||||
let rt = test_runtime();
|
||||
|
||||
let valid = Receipt::default();
|
||||
let invalid = Receipt {
|
||||
tx_type: TxType::Legacy,
|
||||
cumulative_gas_used: 21_000,
|
||||
success: true,
|
||||
logs: vec![Log {
|
||||
address: Address::ZERO,
|
||||
data: alloy_primitives::LogData::new_unchecked(vec![B256::ZERO], Bytes::new()),
|
||||
}],
|
||||
};
|
||||
|
||||
let expected = expected_root_bloom(core::slice::from_ref(&valid));
|
||||
|
||||
let mut handle = PostExecHandle::<Receipt>::new(&rt, 1);
|
||||
handle.push_receipt(0, valid);
|
||||
handle.push_receipt(999, invalid);
|
||||
handle.finish(HashedPostState::default, None::<fn() -> B256>);
|
||||
|
||||
assert_eq!(handle.receipt_root_bloom(), Some(expected));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn post_exec_handle_returns_none_for_incomplete_stream() {
|
||||
let rt = test_runtime();
|
||||
|
||||
let mut handle = PostExecHandle::<Receipt>::new(&rt, 2);
|
||||
handle.push_receipt(0, Receipt::default());
|
||||
// Finish with only 1 of 2 receipts — root should be None.
|
||||
handle.finish(HashedPostState::default, None::<fn() -> B256>);
|
||||
|
||||
assert!(handle.receipt_root_bloom().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn post_exec_handle_with_hashed_post_state() {
|
||||
let rt = test_runtime();
|
||||
|
||||
let mut handle = PostExecHandle::<Receipt>::new(&rt, 0);
|
||||
let expected = HashedPostState::default();
|
||||
handle.finish(HashedPostState::default, None::<fn() -> B256>);
|
||||
|
||||
assert_eq!(handle.hashed_post_state(), &expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn post_exec_handle_with_transaction_root() {
|
||||
let rt = test_runtime();
|
||||
|
||||
let expected_root = B256::repeat_byte(0x42);
|
||||
let mut handle = PostExecHandle::<Receipt>::new(&rt, 0);
|
||||
handle.finish(HashedPostState::default, Some(move || expected_root));
|
||||
|
||||
assert_eq!(handle.transaction_root(), Some(expected_root));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn post_exec_handle_without_transaction_root() {
|
||||
let rt = test_runtime();
|
||||
|
||||
let mut handle = PostExecHandle::<Receipt>::new(&rt, 0);
|
||||
handle.finish(HashedPostState::default, None::<fn() -> B256>);
|
||||
|
||||
assert_eq!(handle.transaction_root(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn post_exec_handle_parallel_blocks() {
|
||||
let rt = test_runtime();
|
||||
|
||||
let receipts_a = sample_receipts();
|
||||
let (expected_root_a, expected_bloom_a) = expected_root_bloom(&receipts_a);
|
||||
|
||||
let receipts_b = vec![Receipt::default(); 2];
|
||||
let (expected_root_b, expected_bloom_b) = expected_root_bloom(&receipts_b);
|
||||
|
||||
let mut handle_a = PostExecHandle::<Receipt>::new(&rt, receipts_a.len());
|
||||
let mut handle_b = PostExecHandle::<Receipt>::new(&rt, receipts_b.len());
|
||||
|
||||
for (index, receipt) in receipts_a.into_iter().enumerate() {
|
||||
handle_a.push_receipt(index, receipt);
|
||||
}
|
||||
for (index, receipt) in receipts_b.into_iter().enumerate() {
|
||||
handle_b.push_receipt(index, receipt);
|
||||
}
|
||||
|
||||
handle_a.finish(HashedPostState::default, None::<fn() -> B256>);
|
||||
handle_b.finish(HashedPostState::default, None::<fn() -> B256>);
|
||||
|
||||
let (root_a, bloom_a) = handle_a.receipt_root_bloom().unwrap();
|
||||
let (root_b, bloom_b) = handle_b.receipt_root_bloom().unwrap();
|
||||
|
||||
assert_eq!(root_a, expected_root_a);
|
||||
assert_eq!(bloom_a, expected_bloom_a);
|
||||
assert_eq!(root_b, expected_root_b);
|
||||
assert_eq!(bloom_b, expected_bloom_b);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn post_exec_handle_aborted_block_then_next_succeeds() {
|
||||
let rt = test_runtime();
|
||||
|
||||
// First block: aborted (dropped without finishing all receipts)
|
||||
let handle = PostExecHandle::<Receipt>::new(&rt, 2);
|
||||
handle.push_receipt(0, Receipt::default());
|
||||
drop(handle);
|
||||
|
||||
// Second block: succeeds
|
||||
let mut handle = PostExecHandle::<Receipt>::new(&rt, 1);
|
||||
handle.push_receipt(0, Receipt::default());
|
||||
handle.finish(HashedPostState::default, None::<fn() -> B256>);
|
||||
assert!(handle.receipt_root_bloom().is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn lazy_hashed_post_state_get_and_try_into_inner() {
|
||||
let rt = test_runtime();
|
||||
|
||||
let mut handle = PostExecHandle::<Receipt>::new(&rt, 0);
|
||||
handle.finish(HashedPostState::default, None::<fn() -> B256>);
|
||||
|
||||
let lazy = handle.into_lazy_hashed_state();
|
||||
// handle is partially consumed but Drop is safe (hashed_post_state is now None)
|
||||
drop(handle);
|
||||
assert_eq!(lazy.get(), &HashedPostState::default());
|
||||
|
||||
let inner = lazy.try_into_inner().unwrap();
|
||||
assert_eq!(inner, HashedPostState::default());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn lazy_hashed_post_state_clone_prevents_try_into_inner() {
|
||||
let rt = test_runtime();
|
||||
|
||||
let mut handle = PostExecHandle::<Receipt>::new(&rt, 0);
|
||||
handle.finish(HashedPostState::default, None::<fn() -> B256>);
|
||||
|
||||
let lazy = handle.into_lazy_hashed_state();
|
||||
drop(handle);
|
||||
let _clone = lazy.clone();
|
||||
|
||||
// try_into_inner fails because there are multiple Arc references.
|
||||
let lazy = lazy.try_into_inner().unwrap_err();
|
||||
assert_eq!(lazy.get(), &HashedPostState::default());
|
||||
}
|
||||
}
|
||||
@@ -1,281 +0,0 @@
|
||||
//! Receipt root computation in a background task.
|
||||
//!
|
||||
//! This module provides a streaming receipt root builder that computes the receipt trie root
|
||||
//! in a background thread. Receipts are sent via a channel with their index, and for each
|
||||
//! receipt received, the builder incrementally flushes leaves to the underlying
|
||||
//! [`OrderedTrieRootEncodedBuilder`] when possible. When the channel closes, the task returns the
|
||||
//! computed root.
|
||||
|
||||
use alloy_eips::Encodable2718;
|
||||
use alloy_primitives::{Bloom, B256};
|
||||
use crossbeam_channel::Receiver;
|
||||
use reth_primitives_traits::Receipt;
|
||||
use reth_trie_common::ordered_root::OrderedTrieRootEncodedBuilder;
|
||||
use tokio::sync::oneshot;
|
||||
use tracing::debug_span;
|
||||
|
||||
/// Receipt with index, ready to be sent to the background task for encoding and trie building.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct IndexedReceipt<R> {
|
||||
/// The transaction index within the block.
|
||||
pub index: usize,
|
||||
/// The receipt.
|
||||
pub receipt: R,
|
||||
}
|
||||
|
||||
impl<R> IndexedReceipt<R> {
|
||||
/// Creates a new indexed receipt.
|
||||
#[inline]
|
||||
pub const fn new(index: usize, receipt: R) -> Self {
|
||||
Self { index, receipt }
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle for running the receipt root computation in a background task.
|
||||
///
|
||||
/// This struct holds the channels needed to receive receipts and send the result.
|
||||
/// Use [`Self::run`] to execute the computation (typically in a spawned blocking task).
|
||||
#[derive(Debug)]
|
||||
pub struct ReceiptRootTaskHandle<R> {
|
||||
/// Receiver for indexed receipts.
|
||||
receipt_rx: Receiver<IndexedReceipt<R>>,
|
||||
/// Sender for the computed result.
|
||||
result_tx: oneshot::Sender<(B256, Bloom)>,
|
||||
}
|
||||
|
||||
impl<R: Receipt> ReceiptRootTaskHandle<R> {
|
||||
/// Creates a new handle from the receipt receiver and result sender channels.
|
||||
pub const fn new(
|
||||
receipt_rx: Receiver<IndexedReceipt<R>>,
|
||||
result_tx: oneshot::Sender<(B256, Bloom)>,
|
||||
) -> Self {
|
||||
Self { receipt_rx, result_tx }
|
||||
}
|
||||
|
||||
/// Runs the receipt root computation, consuming the handle.
|
||||
///
|
||||
/// This method receives indexed receipts from the channel, encodes them,
|
||||
/// and builds the trie incrementally. When all receipts have been received
|
||||
/// (channel closed), it sends the result through the oneshot channel.
|
||||
///
|
||||
/// This is designed to be called inside a blocking task (e.g., via
|
||||
/// `executor.spawn_blocking(move || handle.run(receipts_len))`).
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `receipts_len` - The total number of receipts expected. This is needed to correctly order
|
||||
/// the trie keys according to RLP encoding rules.
|
||||
pub fn run(self, receipts_len: usize) {
|
||||
let _span = debug_span!(
|
||||
target: "engine::tree::payload_processor",
|
||||
"receipt_root",
|
||||
receipts_len,
|
||||
)
|
||||
.entered();
|
||||
|
||||
let mut builder = OrderedTrieRootEncodedBuilder::new(receipts_len);
|
||||
let mut aggregated_bloom = Bloom::ZERO;
|
||||
let mut encode_buf = Vec::new();
|
||||
let mut received_count = 0usize;
|
||||
|
||||
for indexed_receipt in self.receipt_rx {
|
||||
let receipt_with_bloom = indexed_receipt.receipt.with_bloom_ref();
|
||||
|
||||
encode_buf.clear();
|
||||
receipt_with_bloom.encode_2718(&mut encode_buf);
|
||||
|
||||
aggregated_bloom |= *receipt_with_bloom.bloom_ref();
|
||||
match builder.push(indexed_receipt.index, &encode_buf) {
|
||||
Ok(()) => {
|
||||
received_count += 1;
|
||||
}
|
||||
Err(err) => {
|
||||
// If a duplicate or out-of-bounds index is streamed, skip it and
|
||||
// fall back to computing the receipt root from the full receipts
|
||||
// vector later.
|
||||
tracing::error!(
|
||||
target: "engine::tree::payload_processor",
|
||||
index = indexed_receipt.index,
|
||||
?err,
|
||||
"Receipt root task received invalid receipt index, skipping"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let Ok(root) = builder.finalize() else {
|
||||
// Finalize fails if we didn't receive exactly `receipts_len` receipts. This can
|
||||
// happen if execution was aborted early (e.g., invalid transaction encountered).
|
||||
// We return without sending a result, allowing the caller to handle the abort.
|
||||
tracing::error!(
|
||||
target: "engine::tree::payload_processor",
|
||||
expected = receipts_len,
|
||||
received = received_count,
|
||||
"Receipt root task received incomplete receipts, execution likely aborted"
|
||||
);
|
||||
return;
|
||||
};
|
||||
let _ = self.result_tx.send((root, aggregated_bloom));
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use alloy_consensus::{proofs::calculate_receipt_root, TxReceipt};
|
||||
use alloy_primitives::{b256, hex, Address, Bytes, Log};
|
||||
use crossbeam_channel::bounded;
|
||||
use reth_ethereum_primitives::{Receipt, TxType};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_receipt_root_task_empty() {
|
||||
let (_tx, rx) = bounded::<IndexedReceipt<Receipt>>(1);
|
||||
let (result_tx, result_rx) = oneshot::channel();
|
||||
drop(_tx);
|
||||
|
||||
let handle = ReceiptRootTaskHandle::new(rx, result_tx);
|
||||
tokio::task::spawn_blocking(move || handle.run(0)).await.unwrap();
|
||||
|
||||
let (root, bloom) = result_rx.await.unwrap();
|
||||
|
||||
// Empty trie root
|
||||
assert_eq!(root, reth_trie_common::EMPTY_ROOT_HASH);
|
||||
assert_eq!(bloom, Bloom::ZERO);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_receipt_root_task_single_receipt() {
|
||||
let receipts: Vec<Receipt> = vec![Receipt::default()];
|
||||
|
||||
let (tx, rx) = bounded(1);
|
||||
let (result_tx, result_rx) = oneshot::channel();
|
||||
let receipts_len = receipts.len();
|
||||
|
||||
let handle = ReceiptRootTaskHandle::new(rx, result_tx);
|
||||
let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
|
||||
|
||||
for (i, receipt) in receipts.clone().into_iter().enumerate() {
|
||||
tx.send(IndexedReceipt::new(i, receipt)).unwrap();
|
||||
}
|
||||
drop(tx);
|
||||
|
||||
join_handle.await.unwrap();
|
||||
let (root, _bloom) = result_rx.await.unwrap();
|
||||
|
||||
// Verify against the standard calculation
|
||||
let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
|
||||
let expected_root = calculate_receipt_root(&receipts_with_bloom);
|
||||
|
||||
assert_eq!(root, expected_root);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_receipt_root_task_multiple_receipts() {
|
||||
let receipts: Vec<Receipt> = vec![Receipt::default(); 5];
|
||||
|
||||
let (tx, rx) = bounded(4);
|
||||
let (result_tx, result_rx) = oneshot::channel();
|
||||
let receipts_len = receipts.len();
|
||||
|
||||
let handle = ReceiptRootTaskHandle::new(rx, result_tx);
|
||||
let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
|
||||
|
||||
for (i, receipt) in receipts.into_iter().enumerate() {
|
||||
tx.send(IndexedReceipt::new(i, receipt)).unwrap();
|
||||
}
|
||||
drop(tx);
|
||||
|
||||
join_handle.await.unwrap();
|
||||
let (root, bloom) = result_rx.await.unwrap();
|
||||
|
||||
// Verify against expected values from existing test
|
||||
assert_eq!(
|
||||
root,
|
||||
b256!("0x61353b4fb714dc1fccacbf7eafc4273e62f3d1eed716fe41b2a0cd2e12c63ebc")
|
||||
);
|
||||
assert_eq!(
|
||||
bloom,
|
||||
Bloom::from(hex!("00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"))
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_receipt_root_matches_standard_calculation() {
|
||||
// Create some receipts with actual data
|
||||
let receipts = vec![
|
||||
Receipt {
|
||||
tx_type: TxType::Legacy,
|
||||
cumulative_gas_used: 21000,
|
||||
success: true,
|
||||
logs: vec![],
|
||||
},
|
||||
Receipt {
|
||||
tx_type: TxType::Eip1559,
|
||||
cumulative_gas_used: 42000,
|
||||
success: true,
|
||||
logs: vec![Log {
|
||||
address: Address::ZERO,
|
||||
data: alloy_primitives::LogData::new_unchecked(vec![B256::ZERO], Bytes::new()),
|
||||
}],
|
||||
},
|
||||
Receipt {
|
||||
tx_type: TxType::Eip2930,
|
||||
cumulative_gas_used: 63000,
|
||||
success: false,
|
||||
logs: vec![],
|
||||
},
|
||||
];
|
||||
|
||||
// Calculate expected values first (before we move receipts)
|
||||
let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
|
||||
let expected_root = calculate_receipt_root(&receipts_with_bloom);
|
||||
let expected_bloom =
|
||||
receipts_with_bloom.iter().fold(Bloom::ZERO, |bloom, r| bloom | r.bloom_ref());
|
||||
|
||||
// Calculate using the task
|
||||
let (tx, rx) = bounded(4);
|
||||
let (result_tx, result_rx) = oneshot::channel();
|
||||
let receipts_len = receipts.len();
|
||||
|
||||
let handle = ReceiptRootTaskHandle::new(rx, result_tx);
|
||||
let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
|
||||
|
||||
for (i, receipt) in receipts.into_iter().enumerate() {
|
||||
tx.send(IndexedReceipt::new(i, receipt)).unwrap();
|
||||
}
|
||||
drop(tx);
|
||||
|
||||
join_handle.await.unwrap();
|
||||
let (task_root, task_bloom) = result_rx.await.unwrap();
|
||||
|
||||
assert_eq!(task_root, expected_root);
|
||||
assert_eq!(task_bloom, expected_bloom);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_receipt_root_task_out_of_order() {
|
||||
let receipts: Vec<Receipt> = vec![Receipt::default(); 5];
|
||||
|
||||
// Calculate expected values first (before we move receipts)
|
||||
let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
|
||||
let expected_root = calculate_receipt_root(&receipts_with_bloom);
|
||||
|
||||
let (tx, rx) = bounded(4);
|
||||
let (result_tx, result_rx) = oneshot::channel();
|
||||
let receipts_len = receipts.len();
|
||||
|
||||
let handle = ReceiptRootTaskHandle::new(rx, result_tx);
|
||||
let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
|
||||
|
||||
// Send in reverse order to test out-of-order handling
|
||||
for (i, receipt) in receipts.into_iter().enumerate().rev() {
|
||||
tx.send(IndexedReceipt::new(i, receipt)).unwrap();
|
||||
}
|
||||
drop(tx);
|
||||
|
||||
join_handle.await.unwrap();
|
||||
let (root, _bloom) = result_rx.await.unwrap();
|
||||
|
||||
assert_eq!(root, expected_root);
|
||||
}
|
||||
}
|
||||
@@ -18,7 +18,7 @@ use alloy_primitives::B256;
|
||||
#[cfg(feature = "trie-debug")]
|
||||
use reth_trie_sparse::debug_recorder::TrieDebugRecorder;
|
||||
|
||||
use crate::tree::payload_processor::receipt_root_task::{IndexedReceipt, ReceiptRootTaskHandle};
|
||||
use crate::tree::payload_processor::post_exec::PostExecHandle;
|
||||
use reth_chain_state::{CanonicalInMemoryState, DeferredTrieData, ExecutedBlock, LazyOverlay};
|
||||
use reth_consensus::{ConsensusError, FullConsensus, ReceiptRootBloom};
|
||||
use reth_engine_primitives::{
|
||||
@@ -497,9 +497,9 @@ where
|
||||
}
|
||||
|
||||
// Execute the block and handle any execution errors.
|
||||
// The receipt root task is spawned before execution and receives receipts incrementally
|
||||
// as transactions complete, allowing parallel computation during execution.
|
||||
let (output, senders, receipt_root_rx) =
|
||||
// The post-exec handle manages receipt root computation in a background worker,
|
||||
// receiving receipts incrementally as transactions complete.
|
||||
let (output, senders, mut post_exec) =
|
||||
match self.execute_block(state_provider, env, &input, &mut handle) {
|
||||
Ok(output) => output,
|
||||
Err(err) => return self.handle_execution_error(input, err, &parent_block),
|
||||
@@ -517,59 +517,40 @@ where
|
||||
// needed. This frees up resources while state root computation continues.
|
||||
let valid_block_tx = handle.terminate_caching(Some(output.clone()));
|
||||
|
||||
// Spawn hashed post state computation in background so it runs concurrently with
|
||||
// block conversion and receipt root computation. This is a pure CPU-bound task
|
||||
// (keccak256 hashing of all changed addresses and storage slots).
|
||||
// Spawn hashed post state and (for payloads) transaction root on separate threads,
|
||||
// in parallel with receipt-root finalization. Dropping the channel closes the receipt
|
||||
// stream.
|
||||
let hashed_state_output = output.clone();
|
||||
let hashed_state_provider = self.provider.clone();
|
||||
let hashed_state: LazyHashedPostState =
|
||||
self.payload_processor.executor().spawn_blocking_named("hash-post-state", move || {
|
||||
let block = convert_to_block(input)?;
|
||||
let tx_root_fn = is_payload.then(|| {
|
||||
let block = block.clone();
|
||||
let parent_span = Span::current();
|
||||
let num_hash = block.num_hash();
|
||||
move || {
|
||||
let _span =
|
||||
debug_span!(target: "engine::tree::payload_validator", parent: parent_span, "payload_tx_root", block = ?num_hash)
|
||||
.entered();
|
||||
block.body().calculate_tx_root()
|
||||
}
|
||||
});
|
||||
post_exec.finish(
|
||||
move || {
|
||||
let _span = debug_span!(
|
||||
target: "engine::tree::payload_validator",
|
||||
"hashed_post_state",
|
||||
)
|
||||
.entered();
|
||||
hashed_state_provider.hashed_post_state(&hashed_state_output.state)
|
||||
});
|
||||
},
|
||||
tx_root_fn,
|
||||
);
|
||||
|
||||
let block = convert_to_block(input)?;
|
||||
let transaction_root = is_payload.then(|| {
|
||||
let block = block.clone();
|
||||
let parent_span = Span::current();
|
||||
let num_hash = block.num_hash();
|
||||
self.payload_processor.executor().spawn_blocking_named("payload-tx-root", move || {
|
||||
let _span =
|
||||
debug_span!(target: "engine::tree::payload_validator", parent: parent_span, "payload_tx_root", block = ?num_hash)
|
||||
.entered();
|
||||
block.body().calculate_tx_root()
|
||||
})
|
||||
});
|
||||
let block = block.with_senders(senders);
|
||||
|
||||
// Wait for the receipt root computation to complete.
|
||||
let receipt_root_bloom = {
|
||||
let _enter = debug_span!(
|
||||
target: "engine::tree::payload_validator",
|
||||
"wait_receipt_root",
|
||||
)
|
||||
.entered();
|
||||
|
||||
receipt_root_rx
|
||||
.blocking_recv()
|
||||
.inspect_err(|_| {
|
||||
tracing::error!(
|
||||
target: "engine::tree::payload_validator",
|
||||
"Receipt root task dropped sender without result, receipt root calculation likely aborted"
|
||||
);
|
||||
})
|
||||
.ok()
|
||||
};
|
||||
let transaction_root = transaction_root.map(|handle| {
|
||||
let _span =
|
||||
debug_span!(target: "engine::tree::payload_validator", "wait_payload_tx_root")
|
||||
.entered();
|
||||
handle.try_into_inner().expect("sole handle")
|
||||
});
|
||||
let receipt_root_bloom = post_exec.receipt_root_bloom();
|
||||
let transaction_root = post_exec.transaction_root();
|
||||
let hashed_state: LazyHashedPostState = post_exec.into_lazy_hashed_state();
|
||||
|
||||
let hashed_state = ensure_ok_post_block!(
|
||||
self.validate_post_execution(
|
||||
@@ -812,11 +793,7 @@ where
|
||||
input: &BlockOrPayload<T>,
|
||||
handle: &mut PayloadHandle<impl ExecutableTxFor<Evm>, Err, N::Receipt>,
|
||||
) -> Result<
|
||||
(
|
||||
BlockExecutionOutput<N::Receipt>,
|
||||
Vec<Address>,
|
||||
tokio::sync::oneshot::Receiver<(B256, alloy_primitives::Bloom)>,
|
||||
),
|
||||
(BlockExecutionOutput<N::Receipt>, Vec<Address>, PostExecHandle<N::Receipt>),
|
||||
InsertBlockErrorKind,
|
||||
>
|
||||
where
|
||||
@@ -865,15 +842,10 @@ where
|
||||
);
|
||||
}
|
||||
|
||||
// Spawn background task to compute receipt root and logs bloom incrementally.
|
||||
// Unbounded channel is used since tx count bounds capacity anyway (max ~30k txs per block).
|
||||
// Create a unified post-exec handle that manages receipt root, hashed post state,
|
||||
// and transaction root computation in parallel background tasks.
|
||||
let receipts_len = input.transaction_count();
|
||||
let (receipt_tx, receipt_rx) = crossbeam_channel::unbounded();
|
||||
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
|
||||
let task_handle = ReceiptRootTaskHandle::new(receipt_rx, result_tx);
|
||||
self.payload_processor
|
||||
.executor()
|
||||
.spawn_blocking_named("receipt-root", move || task_handle.run(receipts_len));
|
||||
let post_exec = self.payload_processor.post_exec_handle(receipts_len);
|
||||
|
||||
let transaction_count = input.transaction_count();
|
||||
let executed_tx_index = Arc::clone(handle.executed_tx_index());
|
||||
@@ -888,10 +860,9 @@ where
|
||||
executor,
|
||||
transaction_count,
|
||||
handle.iter_transactions(),
|
||||
&receipt_tx,
|
||||
&post_exec,
|
||||
&executed_tx_index,
|
||||
)?;
|
||||
drop(receipt_tx);
|
||||
|
||||
// Finish execution and get the result
|
||||
let post_exec_start = Instant::now();
|
||||
@@ -911,10 +882,10 @@ where
|
||||
self.metrics.record_block_execution_gas_bucket(output.result.gas_used, execution_duration);
|
||||
|
||||
debug!(target: "engine::tree::payload_validator", elapsed = ?execution_duration, "Executed block");
|
||||
Ok((output, senders, result_rx))
|
||||
Ok((output, senders, post_exec))
|
||||
}
|
||||
|
||||
/// Executes transactions and collects senders, streaming receipts to a background task.
|
||||
/// Executes transactions and collects senders, streaming receipts to the post-exec worker.
|
||||
///
|
||||
/// This method handles:
|
||||
/// - Applying pre-execution changes (e.g., beacon root updates)
|
||||
@@ -928,7 +899,7 @@ where
|
||||
mut executor: E,
|
||||
transaction_count: usize,
|
||||
transactions: impl Iterator<Item = Result<Tx, Err>>,
|
||||
receipt_tx: &crossbeam_channel::Sender<IndexedReceipt<N::Receipt>>,
|
||||
post_exec: &PostExecHandle<N::Receipt>,
|
||||
executed_tx_index: &AtomicUsize,
|
||||
) -> Result<(E, Vec<Address>), BlockExecutionError>
|
||||
where
|
||||
@@ -982,10 +953,11 @@ where
|
||||
let current_len = executor.receipts().len();
|
||||
if current_len > last_sent_len {
|
||||
last_sent_len = current_len;
|
||||
// Send the latest receipt to the background task for incremental root computation.
|
||||
// Stream the latest receipt to the post-exec worker for incremental root
|
||||
// computation.
|
||||
if let Some(receipt) = executor.receipts().last() {
|
||||
let tx_index = current_len - 1;
|
||||
let _ = receipt_tx.send(IndexedReceipt::new(tx_index, receipt.clone()));
|
||||
post_exec.push_receipt(tx_index, receipt.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user