feat(consensus): incremental receipt root computation in background task (#21131)

This commit is contained in:
Brian Picciano
2026-01-16 20:53:59 +01:00
committed by GitHub
parent 6e6415690c
commit 13707faf1a
20 changed files with 784 additions and 57 deletions

1
Cargo.lock generated
View File

@@ -8506,6 +8506,7 @@ dependencies = [
"reth-testing-utils",
"reth-tracing",
"reth-trie",
"reth-trie-common",
"reth-trie-db",
"reth-trie-parallel",
"reth-trie-sparse",

View File

@@ -152,7 +152,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
};
if let Err(err) = consensus
.validate_block_post_execution(&block, &result)
.validate_block_post_execution(&block, &result, None)
.wrap_err_with(|| {
format!("Failed to validate block {} {}", block.number(), block.hash())
})

View File

@@ -15,6 +15,12 @@ use alloc::{boxed::Box, fmt::Debug, string::String, sync::Arc, vec::Vec};
use alloy_consensus::Header;
use alloy_primitives::{BlockHash, BlockNumber, Bloom, B256};
use core::error::Error;
/// Pre-computed receipt root and logs bloom.
///
/// When provided to [`FullConsensus::validate_block_post_execution`], this allows skipping
/// the receipt root computation and using the pre-computed values instead.
pub type ReceiptRootBloom = (B256, Bloom);
use reth_execution_types::BlockExecutionResult;
use reth_primitives_traits::{
constants::{GAS_LIMIT_BOUND_DIVISOR, MAXIMUM_GAS_LIMIT_BLOCK, MINIMUM_GAS_LIMIT},
@@ -39,11 +45,15 @@ pub trait FullConsensus<N: NodePrimitives>: Consensus<N::Block> {
///
/// See the Yellow Paper sections 4.3.2 "Holistic Validity".
///
/// If `receipt_root_bloom` is provided, the implementation should use the pre-computed
/// receipt root and logs bloom instead of computing them from the receipts.
///
/// Note: validating blocks does not include other validations of the Consensus
fn validate_block_post_execution(
&self,
block: &RecoveredBlock<N::Block>,
result: &BlockExecutionResult<N::Receipt>,
receipt_root_bloom: Option<ReceiptRootBloom>,
) -> Result<(), ConsensusError>;
}

View File

@@ -18,7 +18,7 @@
//!
//! **Not for production use** - provides no security guarantees or consensus validation.
use crate::{Consensus, ConsensusError, FullConsensus, HeaderValidator};
use crate::{Consensus, ConsensusError, FullConsensus, HeaderValidator, ReceiptRootBloom};
use alloc::sync::Arc;
use reth_execution_types::BlockExecutionResult;
use reth_primitives_traits::{Block, NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader};
@@ -76,6 +76,7 @@ impl<N: NodePrimitives> FullConsensus<N> for NoopConsensus {
&self,
_block: &RecoveredBlock<N::Block>,
_result: &BlockExecutionResult<N::Receipt>,
_receipt_root_bloom: Option<ReceiptRootBloom>,
) -> Result<(), ConsensusError> {
Ok(())
}

View File

@@ -1,4 +1,4 @@
use crate::{Consensus, ConsensusError, FullConsensus, HeaderValidator};
use crate::{Consensus, ConsensusError, FullConsensus, HeaderValidator, ReceiptRootBloom};
use core::sync::atomic::{AtomicBool, Ordering};
use reth_execution_types::BlockExecutionResult;
use reth_primitives_traits::{Block, NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader};
@@ -51,6 +51,7 @@ impl<N: NodePrimitives> FullConsensus<N> for TestConsensus {
&self,
_block: &RecoveredBlock<N::Block>,
_result: &BlockExecutionResult<N::Receipt>,
_receipt_root_bloom: Option<ReceiptRootBloom>,
) -> Result<(), ConsensusError> {
if self.fail_validation() {
Err(ConsensusError::BaseFeeMissing)

View File

@@ -34,6 +34,7 @@ reth-trie-parallel.workspace = true
reth-trie-sparse = { workspace = true, features = ["std", "metrics"] }
reth-trie-sparse-parallel = { workspace = true, features = ["std"] }
reth-trie.workspace = true
reth-trie-common.workspace = true
reth-trie-db.workspace = true
# alloy
@@ -134,6 +135,7 @@ test-utils = [
"reth-static-file",
"reth-tracing",
"reth-trie/test-utils",
"reth-trie-common/test-utils",
"reth-trie-db/test-utils",
"reth-trie-sparse/test-utils",
"reth-prune-types?/test-utils",

View File

@@ -60,16 +60,22 @@ impl EngineApiMetrics {
///
/// This method updates metrics for execution time, gas usage, and the number
/// of accounts, storage slots and bytecodes loaded and updated.
pub(crate) fn execute_metered<E, DB>(
///
/// The optional `on_receipt` callback is invoked after each transaction with the receipt
/// index and a reference to all receipts collected so far. This allows callers to stream
/// receipts to a background task for incremental receipt root computation.
pub(crate) fn execute_metered<E, DB, F>(
&self,
executor: E,
mut transactions: impl Iterator<Item = Result<impl ExecutableTx<E>, BlockExecutionError>>,
transaction_count: usize,
state_hook: Box<dyn OnStateHook>,
mut on_receipt: F,
) -> Result<(BlockExecutionOutput<E::Receipt>, Vec<Address>), BlockExecutionError>
where
DB: alloy_evm::Database,
E: BlockExecutor<Evm: Evm<DB: BorrowMut<State<DB>>>, Transaction: SignedTransaction>,
F: FnMut(&[E::Receipt]),
{
// clone here is cheap, all the metrics are Option<Arc<_>>. additionally
// they are globally registered so that the data recorded in the hook will
@@ -103,6 +109,9 @@ impl EngineApiMetrics {
let gas_used = executor.execute_transaction(tx)?;
self.executor.transaction_execution_histogram.record(start.elapsed());
// Invoke callback with the latest receipt
on_receipt(executor.receipts());
// record the tx gas used
enter.record("gas_used", gas_used);
}
@@ -536,11 +545,12 @@ mod tests {
let executor = MockExecutor::new(state);
// This will fail to create the EVM but should still call the hook
let _result = metrics.execute_metered::<_, EmptyDB>(
let _result = metrics.execute_metered::<_, EmptyDB, _>(
executor,
input.clone_transactions_recovered().map(Ok::<_, BlockExecutionError>),
input.transaction_count(),
state_hook,
|_| {},
);
// Check if hook was called (it might not be if finish() fails early)
@@ -595,11 +605,12 @@ mod tests {
let executor = MockExecutor::new(state);
// Execute (will fail but should still update some metrics)
let _result = metrics.execute_metered::<_, EmptyDB>(
let _result = metrics.execute_metered::<_, EmptyDB, _>(
executor,
input.clone_transactions_recovered().map(Ok::<_, BlockExecutionError>),
input.transaction_count(),
state_hook,
|_| {},
);
let snapshot = snapshotter.snapshot().into_vec();

View File

@@ -61,6 +61,7 @@ mod configured_sparse_trie;
pub mod executor;
pub mod multiproof;
pub mod prewarm;
pub mod receipt_root_task;
pub mod sparse_trie;
use configured_sparse_trie::ConfiguredSparseTrie;

View File

@@ -0,0 +1,250 @@
//! Receipt root computation in a background task.
//!
//! This module provides a streaming receipt root builder that computes the receipt trie root
//! in a background thread. Receipts are sent via a channel with their index, and for each
//! receipt received, the builder incrementally flushes leaves to the underlying
//! [`OrderedTrieRootEncodedBuilder`] when possible. When the channel closes, the task returns the
//! computed root.
use alloy_eips::Encodable2718;
use alloy_primitives::{Bloom, B256};
use crossbeam_channel::Receiver;
use reth_primitives_traits::Receipt;
use reth_trie_common::ordered_root::OrderedTrieRootEncodedBuilder;
use tokio::sync::oneshot;
/// Receipt with index, ready to be sent to the background task for encoding and trie building.
#[derive(Debug, Clone)]
pub struct IndexedReceipt<R> {
/// The transaction index within the block.
pub index: usize,
/// The receipt.
pub receipt: R,
}
impl<R> IndexedReceipt<R> {
/// Creates a new indexed receipt.
#[inline]
pub const fn new(index: usize, receipt: R) -> Self {
Self { index, receipt }
}
}
/// Handle for running the receipt root computation in a background task.
///
/// This struct holds the channels needed to receive receipts and send the result.
/// Use [`Self::run`] to execute the computation (typically in a spawned blocking task).
#[derive(Debug)]
pub struct ReceiptRootTaskHandle<R> {
/// Receiver for indexed receipts.
receipt_rx: Receiver<IndexedReceipt<R>>,
/// Sender for the computed result.
result_tx: oneshot::Sender<(B256, Bloom)>,
}
impl<R: Receipt> ReceiptRootTaskHandle<R> {
/// Creates a new handle from the receipt receiver and result sender channels.
pub const fn new(
receipt_rx: Receiver<IndexedReceipt<R>>,
result_tx: oneshot::Sender<(B256, Bloom)>,
) -> Self {
Self { receipt_rx, result_tx }
}
/// Runs the receipt root computation, consuming the handle.
///
/// This method receives indexed receipts from the channel, encodes them,
/// and builds the trie incrementally. When all receipts have been received
/// (channel closed), it sends the result through the oneshot channel.
///
/// This is designed to be called inside a blocking task (e.g., via
/// `executor.spawn_blocking(move || handle.run(receipts_len))`).
///
/// # Arguments
///
/// * `receipts_len` - The total number of receipts expected. This is needed to correctly order
/// the trie keys according to RLP encoding rules.
///
/// # Panics
///
/// Panics if the number of receipts received doesn't match `receipts_len`.
pub fn run(self, receipts_len: usize) {
let mut builder = OrderedTrieRootEncodedBuilder::new(receipts_len);
let mut aggregated_bloom = Bloom::ZERO;
let mut encode_buf = Vec::new();
for indexed_receipt in self.receipt_rx {
let receipt_with_bloom = indexed_receipt.receipt.with_bloom_ref();
encode_buf.clear();
receipt_with_bloom.encode_2718(&mut encode_buf);
aggregated_bloom |= *receipt_with_bloom.bloom_ref();
builder.push_unchecked(indexed_receipt.index, &encode_buf);
}
let root = builder.finalize().expect("receipt root builder incomplete");
let _ = self.result_tx.send((root, aggregated_bloom));
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloy_consensus::{proofs::calculate_receipt_root, TxReceipt};
use alloy_primitives::{b256, hex, Address, Bytes, Log};
use crossbeam_channel::bounded;
use reth_ethereum_primitives::{Receipt, TxType};
#[tokio::test]
async fn test_receipt_root_task_empty() {
let (_tx, rx) = bounded::<IndexedReceipt<Receipt>>(1);
let (result_tx, result_rx) = oneshot::channel();
drop(_tx);
let handle = ReceiptRootTaskHandle::new(rx, result_tx);
tokio::task::spawn_blocking(move || handle.run(0)).await.unwrap();
let (root, bloom) = result_rx.await.unwrap();
// Empty trie root
assert_eq!(root, reth_trie_common::EMPTY_ROOT_HASH);
assert_eq!(bloom, Bloom::ZERO);
}
#[tokio::test]
async fn test_receipt_root_task_single_receipt() {
let receipts: Vec<Receipt> = vec![Receipt::default()];
let (tx, rx) = bounded(1);
let (result_tx, result_rx) = oneshot::channel();
let receipts_len = receipts.len();
let handle = ReceiptRootTaskHandle::new(rx, result_tx);
let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
for (i, receipt) in receipts.clone().into_iter().enumerate() {
tx.send(IndexedReceipt::new(i, receipt)).unwrap();
}
drop(tx);
join_handle.await.unwrap();
let (root, _bloom) = result_rx.await.unwrap();
// Verify against the standard calculation
let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
let expected_root = calculate_receipt_root(&receipts_with_bloom);
assert_eq!(root, expected_root);
}
#[tokio::test]
async fn test_receipt_root_task_multiple_receipts() {
let receipts: Vec<Receipt> = vec![Receipt::default(); 5];
let (tx, rx) = bounded(4);
let (result_tx, result_rx) = oneshot::channel();
let receipts_len = receipts.len();
let handle = ReceiptRootTaskHandle::new(rx, result_tx);
let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
for (i, receipt) in receipts.into_iter().enumerate() {
tx.send(IndexedReceipt::new(i, receipt)).unwrap();
}
drop(tx);
join_handle.await.unwrap();
let (root, bloom) = result_rx.await.unwrap();
// Verify against expected values from existing test
assert_eq!(
root,
b256!("0x61353b4fb714dc1fccacbf7eafc4273e62f3d1eed716fe41b2a0cd2e12c63ebc")
);
assert_eq!(
bloom,
Bloom::from(hex!("00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"))
);
}
#[tokio::test]
async fn test_receipt_root_matches_standard_calculation() {
// Create some receipts with actual data
let receipts = vec![
Receipt {
tx_type: TxType::Legacy,
cumulative_gas_used: 21000,
success: true,
logs: vec![],
},
Receipt {
tx_type: TxType::Eip1559,
cumulative_gas_used: 42000,
success: true,
logs: vec![Log {
address: Address::ZERO,
data: alloy_primitives::LogData::new_unchecked(vec![B256::ZERO], Bytes::new()),
}],
},
Receipt {
tx_type: TxType::Eip2930,
cumulative_gas_used: 63000,
success: false,
logs: vec![],
},
];
// Calculate expected values first (before we move receipts)
let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
let expected_root = calculate_receipt_root(&receipts_with_bloom);
let expected_bloom =
receipts_with_bloom.iter().fold(Bloom::ZERO, |bloom, r| bloom | r.bloom_ref());
// Calculate using the task
let (tx, rx) = bounded(4);
let (result_tx, result_rx) = oneshot::channel();
let receipts_len = receipts.len();
let handle = ReceiptRootTaskHandle::new(rx, result_tx);
let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
for (i, receipt) in receipts.into_iter().enumerate() {
tx.send(IndexedReceipt::new(i, receipt)).unwrap();
}
drop(tx);
join_handle.await.unwrap();
let (task_root, task_bloom) = result_rx.await.unwrap();
assert_eq!(task_root, expected_root);
assert_eq!(task_bloom, expected_bloom);
}
#[tokio::test]
async fn test_receipt_root_task_out_of_order() {
let receipts: Vec<Receipt> = vec![Receipt::default(); 5];
// Calculate expected values first (before we move receipts)
let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
let expected_root = calculate_receipt_root(&receipts_with_bloom);
let (tx, rx) = bounded(4);
let (result_tx, result_rx) = oneshot::channel();
let receipts_len = receipts.len();
let handle = ReceiptRootTaskHandle::new(rx, result_tx);
let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
// Send in reverse order to test out-of-order handling
for (i, receipt) in receipts.into_iter().enumerate().rev() {
tx.send(IndexedReceipt::new(i, receipt)).unwrap();
}
drop(tx);
join_handle.await.unwrap();
let (root, _bloom) = result_rx.await.unwrap();
assert_eq!(root, expected_root);
}
}

View File

@@ -15,9 +15,11 @@ use alloy_eip7928::BlockAccessList;
use alloy_eips::{eip1898::BlockWithParent, NumHash};
use alloy_evm::Evm;
use alloy_primitives::B256;
use crate::tree::payload_processor::receipt_root_task::{IndexedReceipt, ReceiptRootTaskHandle};
use rayon::prelude::*;
use reth_chain_state::{CanonicalInMemoryState, DeferredTrieData, ExecutedBlock, LazyOverlay};
use reth_consensus::{ConsensusError, FullConsensus};
use reth_consensus::{ConsensusError, FullConsensus, ReceiptRootBloom};
use reth_engine_primitives::{
ConfigureEngineEvm, ExecutableTxIterator, ExecutionPayload, InvalidBlockHook, PayloadValidator,
};
@@ -453,11 +455,14 @@ where
state_provider = Box::new(InstrumentedStateProvider::new(state_provider, "engine"));
}
// Execute the block and handle any execution errors
let (output, senders) = match self.execute_block(state_provider, env, &input, &mut handle) {
Ok(output) => output,
Err(err) => return self.handle_execution_error(input, err, &parent_block),
};
// Execute the block and handle any execution errors.
// The receipt root task is spawned before execution and receives receipts incrementally
// as transactions complete, allowing parallel computation during execution.
let (output, senders, receipt_root_rx) =
match self.execute_block(state_provider, env, &input, &mut handle) {
Ok(output) => output,
Err(err) => return self.handle_execution_error(input, err, &parent_block),
};
// After executing the block we can stop prewarming transactions
handle.stop_prewarming_execution();
@@ -473,8 +478,21 @@ where
let block = self.convert_to_block(input)?.with_senders(senders);
// Wait for the receipt root computation to complete.
let receipt_root_bloom = Some(
receipt_root_rx
.blocking_recv()
.expect("receipt root task dropped sender without result"),
);
let hashed_state = ensure_ok_post_block!(
self.validate_post_execution(&block, &parent_block, &output, &mut ctx),
self.validate_post_execution(
&block,
&parent_block,
&output,
&mut ctx,
receipt_root_bloom
),
block
);
@@ -622,13 +640,21 @@ where
/// Executes a block with the given state provider
#[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
#[expect(clippy::type_complexity)]
fn execute_block<S, Err, T>(
&mut self,
state_provider: S,
env: ExecutionEnv<Evm>,
input: &BlockOrPayload<T>,
handle: &mut PayloadHandle<impl ExecutableTxFor<Evm>, Err, N::Receipt>,
) -> Result<(BlockExecutionOutput<N::Receipt>, Vec<Address>), InsertBlockErrorKind>
) -> Result<
(
BlockExecutionOutput<N::Receipt>,
Vec<Address>,
tokio::sync::oneshot::Receiver<(B256, alloy_primitives::Bloom)>,
),
InsertBlockErrorKind,
>
where
S: StateProvider + Send,
Err: core::error::Error + Send + Sync + 'static,
@@ -667,6 +693,14 @@ where
});
}
// Spawn background task to compute receipt root and logs bloom incrementally.
// Unbounded channel is used since tx count bounds capacity anyway (max ~30k txs per block).
let receipts_len = input.transaction_count();
let (receipt_tx, receipt_rx) = crossbeam_channel::unbounded();
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
let task_handle = ReceiptRootTaskHandle::new(receipt_rx, result_tx);
self.payload_processor.executor().spawn_blocking(move || task_handle.run(receipts_len));
let execution_start = Instant::now();
let state_hook = Box::new(handle.state_hook());
let (output, senders) = self.metrics.execute_metered(
@@ -674,11 +708,22 @@ where
handle.iter_transactions().map(|res| res.map_err(BlockExecutionError::other)),
input.transaction_count(),
state_hook,
|receipts| {
// Send the latest receipt to the background task for incremental root computation.
// The receipt is cloned here; encoding happens in the background thread.
if let Some(receipt) = receipts.last() {
// Infer tx_index from the number of receipts collected so far
let tx_index = receipts.len() - 1;
let _ = receipt_tx.send(IndexedReceipt::new(tx_index, receipt.clone()));
}
},
)?;
drop(receipt_tx);
let execution_finish = Instant::now();
let execution_time = execution_finish.duration_since(execution_start);
debug!(target: "engine::tree::payload_validator", elapsed = ?execution_time, "Executed block");
Ok((output, senders))
Ok((output, senders, result_rx))
}
/// Compute state root for the given hashed post state in parallel.
@@ -736,6 +781,9 @@ where
/// - parent header validation
/// - post-execution consensus validation
/// - state-root based post-execution validation
///
/// If `receipt_root_bloom` is provided, it will be used instead of computing the receipt root
/// and logs bloom from the receipts.
#[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
fn validate_post_execution<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
&self,
@@ -743,6 +791,7 @@ where
parent_block: &SealedHeader<N::BlockHeader>,
output: &BlockExecutionOutput<N::Receipt>,
ctx: &mut TreeCtx<'_, N>,
receipt_root_bloom: Option<ReceiptRootBloom>,
) -> Result<HashedPostState, InsertBlockErrorKind>
where
V: PayloadValidator<T, Block = N::Block>,
@@ -769,7 +818,9 @@ where
let _enter =
debug_span!(target: "engine::tree::payload_validator", "validate_block_post_execution")
.entered();
if let Err(err) = self.consensus.validate_block_post_execution(block, output) {
if let Err(err) =
self.consensus.validate_block_post_execution(block, output, receipt_root_bloom)
{
// call post-block hook
self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
return Err(err.into())

View File

@@ -15,7 +15,7 @@ use alloc::{fmt::Debug, sync::Arc};
use alloy_consensus::{constants::MAXIMUM_EXTRA_DATA_SIZE, EMPTY_OMMER_ROOT_HASH};
use alloy_eips::eip7840::BlobParams;
use reth_chainspec::{EthChainSpec, EthereumHardforks};
use reth_consensus::{Consensus, ConsensusError, FullConsensus, HeaderValidator};
use reth_consensus::{Consensus, ConsensusError, FullConsensus, HeaderValidator, ReceiptRootBloom};
use reth_consensus_common::validation::{
validate_4844_header_standalone, validate_against_parent_4844,
validate_against_parent_eip1559_base_fee, validate_against_parent_gas_limit,
@@ -74,8 +74,15 @@ where
&self,
block: &RecoveredBlock<N::Block>,
result: &BlockExecutionResult<N::Receipt>,
receipt_root_bloom: Option<ReceiptRootBloom>,
) -> Result<(), ConsensusError> {
validate_block_post_execution(block, &self.chain_spec, &result.receipts, &result.requests)
validate_block_post_execution(
block,
&self.chain_spec,
&result.receipts,
&result.requests,
receipt_root_bloom,
)
}
}

View File

@@ -12,11 +12,15 @@ use reth_primitives_traits::{
///
/// - Compares the receipts root in the block header to the block body
/// - Compares the gas used in the block header to the actual gas usage after execution
///
/// If `receipt_root_bloom` is provided, the pre-computed receipt root and logs bloom are used
/// instead of computing them from the receipts.
pub fn validate_block_post_execution<B, R, ChainSpec>(
block: &RecoveredBlock<B>,
chain_spec: &ChainSpec,
receipts: &[R],
requests: &Requests,
receipt_root_bloom: Option<(B256, Bloom)>,
) -> Result<(), ConsensusError>
where
B: Block,
@@ -37,19 +41,26 @@ where
// operation as hashing that is required for state root got calculated in every
// transaction This was replaced with is_success flag.
// See more about EIP here: https://eips.ethereum.org/EIPS/eip-658
if chain_spec.is_byzantium_active_at_block(block.header().number()) &&
let Err(error) = verify_receipts(
block.header().receipts_root(),
block.header().logs_bloom(),
receipts,
)
{
let receipts = receipts
.iter()
.map(|r| Bytes::from(r.with_bloom_ref().encoded_2718()))
.collect::<Vec<_>>();
tracing::debug!(%error, ?receipts, "receipts verification failed");
return Err(error)
if chain_spec.is_byzantium_active_at_block(block.header().number()) {
let result = if let Some((receipts_root, logs_bloom)) = receipt_root_bloom {
compare_receipts_root_and_logs_bloom(
receipts_root,
logs_bloom,
block.header().receipts_root(),
block.header().logs_bloom(),
)
} else {
verify_receipts(block.header().receipts_root(), block.header().logs_bloom(), receipts)
};
if let Err(error) = result {
let receipts = receipts
.iter()
.map(|r| Bytes::from(r.with_bloom_ref().encoded_2718()))
.collect::<Vec<_>>();
tracing::debug!(%error, ?receipts, "receipts verification failed");
return Err(error)
}
}
// Validate that the header requests hash matches the calculated requests hash

View File

@@ -18,7 +18,7 @@ use alloy_consensus::{
use alloy_primitives::B64;
use core::fmt::Debug;
use reth_chainspec::EthChainSpec;
use reth_consensus::{Consensus, ConsensusError, FullConsensus, HeaderValidator};
use reth_consensus::{Consensus, ConsensusError, FullConsensus, HeaderValidator, ReceiptRootBloom};
use reth_consensus_common::validation::{
validate_against_parent_eip1559_base_fee, validate_against_parent_hash_number,
validate_against_parent_timestamp, validate_cancun_gas, validate_header_base_fee,
@@ -79,8 +79,9 @@ where
&self,
block: &RecoveredBlock<N::Block>,
result: &BlockExecutionResult<N::Receipt>,
receipt_root_bloom: Option<ReceiptRootBloom>,
) -> Result<(), ConsensusError> {
validate_block_post_execution(block.header(), &self.chain_spec, result)
validate_block_post_execution(block.header(), &self.chain_spec, result, receipt_root_bloom)
}
}
@@ -410,7 +411,8 @@ mod tests {
let post_execution = <OpBeaconConsensus<OpChainSpec> as FullConsensus<OpPrimitives>>::validate_block_post_execution(
&beacon_consensus,
&block,
&result
&result,
None,
);
// validate blob, it should pass blob gas used validation
@@ -479,7 +481,8 @@ mod tests {
let post_execution = <OpBeaconConsensus<OpChainSpec> as FullConsensus<OpPrimitives>>::validate_block_post_execution(
&beacon_consensus,
&block,
&result
&result,
None,
);
// validate blob, it should fail blob gas used validation post execution.

View File

@@ -85,10 +85,14 @@ where
///
/// - Compares the receipts root in the block header to the block body
/// - Compares the gas used in the block header to the actual gas usage after execution
///
/// If `receipt_root_bloom` is provided, the pre-computed receipt root and logs bloom are used
/// instead of computing them from the receipts.
pub fn validate_block_post_execution<R: DepositReceipt>(
header: impl BlockHeader,
chain_spec: impl OpHardforks,
result: &BlockExecutionResult<R>,
receipt_root_bloom: Option<(B256, Bloom)>,
) -> Result<(), ConsensusError> {
// Validate that the blob gas used is present and correctly computed if Jovian is active.
if chain_spec.is_jovian_active_at_timestamp(header.timestamp()) {
@@ -110,21 +114,32 @@ pub fn validate_block_post_execution<R: DepositReceipt>(
// operation as hashing that is required for state root got calculated in every
// transaction This was replaced with is_success flag.
// See more about EIP here: https://eips.ethereum.org/EIPS/eip-658
if chain_spec.is_byzantium_active_at_block(header.number()) &&
let Err(error) = verify_receipts_optimism(
header.receipts_root(),
header.logs_bloom(),
receipts,
chain_spec,
header.timestamp(),
)
{
let receipts = receipts
.iter()
.map(|r| Bytes::from(r.with_bloom_ref().encoded_2718()))
.collect::<Vec<_>>();
tracing::debug!(%error, ?receipts, "receipts verification failed");
return Err(error)
if chain_spec.is_byzantium_active_at_block(header.number()) {
let result = if let Some((receipts_root, logs_bloom)) = receipt_root_bloom {
compare_receipts_root_and_logs_bloom(
receipts_root,
logs_bloom,
header.receipts_root(),
header.logs_bloom(),
)
} else {
verify_receipts_optimism(
header.receipts_root(),
header.logs_bloom(),
receipts,
chain_spec,
header.timestamp(),
)
};
if let Err(error) = result {
let receipts = receipts
.iter()
.map(|r| Bytes::from(r.with_bloom_ref().encoded_2718()))
.collect::<Vec<_>>();
tracing::debug!(%error, ?receipts, "receipts verification failed");
return Err(error)
}
}
// Check if gas used matches the value set in header.
@@ -543,7 +558,7 @@ mod tests {
requests: Requests::default(),
gas_used: GAS_USED,
};
validate_block_post_execution(&header, &chainspec, &result).unwrap();
validate_block_post_execution(&header, &chainspec, &result, None).unwrap();
}
#[test]
@@ -565,7 +580,7 @@ mod tests {
gas_used: GAS_USED,
};
assert!(matches!(
validate_block_post_execution(&header, &chainspec, &result).unwrap_err(),
validate_block_post_execution(&header, &chainspec, &result, None).unwrap_err(),
ConsensusError::BlobGasUsedDiff(diff)
if diff.got == BLOB_GAS_USED && diff.expected == BLOB_GAS_USED + 1
));

View File

@@ -201,7 +201,7 @@ where
// update the cached reads
self.update_cached_reads(parent_header_hash, request_cache).await;
self.consensus.validate_block_post_execution(&block, &output)?;
self.consensus.validate_block_post_execution(&block, &output, None)?;
self.ensure_payment(&block, &output, &message)?;

View File

@@ -351,7 +351,7 @@ where
})
})?;
if let Err(err) = self.consensus.validate_block_post_execution(&block, &result) {
if let Err(err) = self.consensus.validate_block_post_execution(&block, &result, None) {
return Err(StageError::Block {
block: Box::new(block.block_with_parent()),
error: BlockErrorKind::Validation(err),

View File

@@ -231,8 +231,14 @@ where
.map_err(|e| StatelessValidationError::StatelessExecutionFailed(e.to_string()))?;
// Post validation checks
validate_block_post_execution(&current_block, &chain_spec, &output.receipts, &output.requests)
.map_err(StatelessValidationError::ConsensusValidationFailed)?;
validate_block_post_execution(
&current_block,
&chain_spec,
&output.receipts,
&output.requests,
None,
)
.map_err(StatelessValidationError::ConsensusValidationFailed)?;
// Compute and check the post state root
let hashed_state = HashedPostState::from_bundle_state::<KeccakKeyHasher>(&output.state.state);

View File

@@ -55,6 +55,9 @@ pub use proofs::*;
pub mod root;
/// Incremental ordered trie root computation.
pub mod ordered_root;
/// Buffer for trie updates.
pub mod updates;

View File

@@ -0,0 +1,354 @@
//! Incremental ordered trie root computation.
//!
//! This module provides builders for computing ordered trie roots incrementally as items
//! arrive, rather than requiring all items upfront. This is useful for receipt root
//! calculation during block execution, where we know the total count but receive receipts
//! one by one as transactions are executed.
use crate::{HashBuilder, Nibbles, EMPTY_ROOT_HASH};
use alloc::vec::Vec;
use alloy_primitives::B256;
use alloy_trie::root::adjust_index_for_rlp;
use core::fmt;
/// Error returned when using [`OrderedTrieRootEncodedBuilder`].
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OrderedRootError {
/// Called `finalize()` before all items were pushed.
Incomplete {
/// The expected number of items.
expected: usize,
/// The number of items received.
received: usize,
},
/// Index is out of bounds.
IndexOutOfBounds {
/// The index that was provided.
index: usize,
/// The expected length.
len: usize,
},
/// Item at this index was already pushed.
DuplicateIndex {
/// The duplicate index.
index: usize,
},
}
impl OrderedRootError {
/// Returns `true` if the error is [`OrderedRootError::Incomplete`].
#[inline]
pub const fn is_incomplete(&self) -> bool {
matches!(self, Self::Incomplete { .. })
}
/// Returns `true` if the error is [`OrderedRootError::IndexOutOfBounds`].
#[inline]
pub const fn is_index_out_of_bounds(&self) -> bool {
matches!(self, Self::IndexOutOfBounds { .. })
}
/// Returns `true` if the error is [`OrderedRootError::DuplicateIndex`].
#[inline]
pub const fn is_duplicate_index(&self) -> bool {
matches!(self, Self::DuplicateIndex { .. })
}
/// Returns the index associated with the error, if any.
#[inline]
pub const fn index(&self) -> Option<usize> {
match self {
Self::Incomplete { .. } => None,
Self::IndexOutOfBounds { index, .. } | Self::DuplicateIndex { index } => Some(*index),
}
}
}
impl fmt::Display for OrderedRootError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Incomplete { expected, received } => {
write!(f, "incomplete: expected {expected} items, received {received}")
}
Self::IndexOutOfBounds { index, len } => {
write!(f, "index {index} out of bounds for length {len}")
}
Self::DuplicateIndex { index } => {
write!(f, "duplicate item at index {index}")
}
}
}
}
#[cfg(feature = "std")]
impl std::error::Error for OrderedRootError {}
/// A builder for computing ordered trie roots incrementally from pre-encoded items.
///
/// This builder allows pushing items one by one as they become available
/// (e.g., receipts after each transaction execution), rather than requiring
/// all items upfront.
///
/// # Use Case
///
/// When executing a block, the receipt root must be computed from all transaction
/// receipts. With the standard `ordered_trie_root`, you must wait until all
/// transactions are executed before computing the root. This builder enables
/// **incremental computation** - you can start building the trie as soon as
/// receipts become available, potentially in parallel with continued execution.
///
/// The builder requires knowing the total item count upfront (the number of
/// transactions in the block), but items can be pushed in any order by index.
///
/// # How It Works
///
/// Items can be pushed in any order by specifying their index. The builder
/// internally buffers items and flushes them to the underlying [`HashBuilder`]
/// in the correct order for RLP key encoding (as determined by [`adjust_index_for_rlp`]).
///
/// # Memory
///
/// Each pushed item is stored in an internal buffer until it can be flushed.
/// In the worst case (e.g., pushing index 0 last), all items except one will
/// be buffered. For receipt roots, index 0 is typically flushed late due to
/// RLP key ordering, so expect to buffer most items until near the end.
///
/// # Example
///
/// ```
/// use reth_trie_common::ordered_root::OrderedTrieRootEncodedBuilder;
///
/// // Create a builder for 2 pre-encoded items
/// let mut builder = OrderedTrieRootEncodedBuilder::new(2);
///
/// // Push pre-encoded items as they arrive (can be out of order)
/// builder.push(1, b"encoded_item_1").unwrap();
/// builder.push(0, b"encoded_item_0").unwrap();
///
/// // Finalize to get the root hash
/// let root = builder.finalize().unwrap();
/// ```
#[derive(Debug)]
pub struct OrderedTrieRootEncodedBuilder {
/// Total expected number of items.
len: usize,
/// Number of items received so far.
received: usize,
/// Next insertion loop counter (determines which adjusted index to flush next).
next_insert_i: usize,
/// Buffer for pending items, indexed by execution index.
pending: Vec<Option<Vec<u8>>>,
/// The underlying hash builder.
hb: HashBuilder,
}
impl OrderedTrieRootEncodedBuilder {
/// Creates a new builder for `len` pre-encoded items.
pub fn new(len: usize) -> Self {
Self {
len,
received: 0,
next_insert_i: 0,
pending: alloc::vec![None; len],
hb: HashBuilder::default(),
}
}
/// Pushes a pre-encoded item at the given index to the builder.
///
/// Items can be pushed in any order. The builder will automatically
/// flush items to the underlying [`HashBuilder`] when they become
/// available in the correct order.
///
/// # Errors
///
/// - [`OrderedRootError::IndexOutOfBounds`] if `index >= len`
/// - [`OrderedRootError::DuplicateIndex`] if an item was already pushed at this index
#[inline]
pub fn push(&mut self, index: usize, bytes: &[u8]) -> Result<(), OrderedRootError> {
if index >= self.len {
return Err(OrderedRootError::IndexOutOfBounds { index, len: self.len });
}
if self.pending[index].is_some() {
return Err(OrderedRootError::DuplicateIndex { index });
}
self.push_unchecked(index, bytes);
Ok(())
}
/// Pushes a pre-encoded item at the given index without bounds or duplicate checking.
///
/// This is a performance-critical method for callers that can guarantee:
/// - `index < len`
/// - No item has been pushed at this index before
///
/// # Panics
///
/// Panics in debug mode if `index >= len`.
#[inline]
pub fn push_unchecked(&mut self, index: usize, bytes: &[u8]) {
debug_assert!(index < self.len, "index {index} out of bounds for length {}", self.len);
debug_assert!(self.pending[index].is_none(), "duplicate item at index {index}");
self.pending[index] = Some(bytes.to_vec());
self.received += 1;
self.flush();
}
/// Attempts to flush pending items to the hash builder.
fn flush(&mut self) {
while self.next_insert_i < self.len {
let exec_index_needed = adjust_index_for_rlp(self.next_insert_i, self.len);
let Some(value) = self.pending[exec_index_needed].take() else {
break;
};
let index_buffer = alloy_rlp::encode_fixed_size(&exec_index_needed);
self.hb.add_leaf(Nibbles::unpack(&index_buffer), &value);
self.next_insert_i += 1;
}
}
/// Returns `true` if all items have been pushed.
#[inline]
pub const fn is_complete(&self) -> bool {
self.received == self.len
}
/// Returns the number of items pushed so far.
#[inline]
pub const fn pushed_count(&self) -> usize {
self.received
}
/// Returns the expected total number of items.
#[inline]
pub const fn expected_count(&self) -> usize {
self.len
}
/// Finalizes the builder and returns the trie root.
///
/// # Errors
///
/// Returns [`OrderedRootError::Incomplete`] if not all items have been pushed.
pub fn finalize(mut self) -> Result<B256, OrderedRootError> {
if self.len == 0 {
return Ok(EMPTY_ROOT_HASH);
}
if self.received != self.len {
return Err(OrderedRootError::Incomplete {
expected: self.len,
received: self.received,
});
}
debug_assert_eq!(self.next_insert_i, self.len, "not all items were flushed");
Ok(self.hb.root())
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloy_trie::root::ordered_trie_root_encoded;
#[test]
fn test_ordered_encoded_builder_equivalence() {
for len in [0, 1, 2, 3, 10, 127, 128, 129, 130, 200] {
let items: Vec<Vec<u8>> =
(0..len).map(|i| format!("item_{i}_data").into_bytes()).collect();
let expected = ordered_trie_root_encoded(&items);
let mut builder = OrderedTrieRootEncodedBuilder::new(len);
for (i, item) in items.iter().enumerate() {
builder.push(i, item).unwrap();
}
let actual = builder.finalize().unwrap();
assert_eq!(
expected, actual,
"mismatch for len={len}: expected {expected:?}, got {actual:?}"
);
}
}
#[test]
fn test_ordered_builder_out_of_order() {
for len in [2, 3, 5, 10, 50] {
let items: Vec<Vec<u8>> =
(0..len).map(|i| format!("item_{i}_data").into_bytes()).collect();
let expected = ordered_trie_root_encoded(&items);
// Push in reverse order
let mut builder = OrderedTrieRootEncodedBuilder::new(len);
for i in (0..len).rev() {
builder.push(i, &items[i]).unwrap();
}
let actual = builder.finalize().unwrap();
assert_eq!(expected, actual, "mismatch for reverse order len={len}");
// Push odds first, then evens
let mut builder = OrderedTrieRootEncodedBuilder::new(len);
for i in (1..len).step_by(2) {
builder.push(i, &items[i]).unwrap();
}
for i in (0..len).step_by(2) {
builder.push(i, &items[i]).unwrap();
}
let actual = builder.finalize().unwrap();
assert_eq!(expected, actual, "mismatch for odd/even order len={len}");
}
}
#[test]
fn test_ordered_builder_empty() {
let builder = OrderedTrieRootEncodedBuilder::new(0);
assert!(builder.is_complete());
assert_eq!(builder.finalize().unwrap(), EMPTY_ROOT_HASH);
}
#[test]
fn test_ordered_builder_incomplete_error() {
let mut builder = OrderedTrieRootEncodedBuilder::new(3);
builder.push(0, b"item_0").unwrap();
builder.push(1, b"item_1").unwrap();
assert!(!builder.is_complete());
assert_eq!(
builder.finalize(),
Err(OrderedRootError::Incomplete { expected: 3, received: 2 })
);
}
#[test]
fn test_ordered_builder_index_errors() {
let mut builder = OrderedTrieRootEncodedBuilder::new(2);
assert_eq!(
builder.push(5, b"item"),
Err(OrderedRootError::IndexOutOfBounds { index: 5, len: 2 })
);
builder.push(0, b"item_0").unwrap();
assert_eq!(
builder.push(0, b"item_0_dup"),
Err(OrderedRootError::DuplicateIndex { index: 0 })
);
builder.push(1, b"item_1").unwrap();
assert!(builder.is_complete());
}
}

View File

@@ -277,7 +277,7 @@ fn run_case(
.map_err(|err| Error::block_failed(block_number, program_inputs.clone(), err))?;
// Consensus checks after block execution
validate_block_post_execution(block, &chain_spec, &output.receipts, &output.requests)
validate_block_post_execution(block, &chain_spec, &output.receipts, &output.requests, None)
.map_err(|err| Error::block_failed(block_number, program_inputs.clone(), err))?;
// Generate the stateless witness