diff --git a/Cargo.lock b/Cargo.lock index cf087d7427..17e53c8137 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8506,6 +8506,7 @@ dependencies = [ "reth-testing-utils", "reth-tracing", "reth-trie", + "reth-trie-common", "reth-trie-db", "reth-trie-parallel", "reth-trie-sparse", diff --git a/crates/cli/commands/src/re_execute.rs b/crates/cli/commands/src/re_execute.rs index 41b6afdbc3..742b6ce76d 100644 --- a/crates/cli/commands/src/re_execute.rs +++ b/crates/cli/commands/src/re_execute.rs @@ -152,7 +152,7 @@ impl }; if let Err(err) = consensus - .validate_block_post_execution(&block, &result) + .validate_block_post_execution(&block, &result, None) .wrap_err_with(|| { format!("Failed to validate block {} {}", block.number(), block.hash()) }) diff --git a/crates/consensus/consensus/src/lib.rs b/crates/consensus/consensus/src/lib.rs index 1911b095c3..319a7a0ffb 100644 --- a/crates/consensus/consensus/src/lib.rs +++ b/crates/consensus/consensus/src/lib.rs @@ -15,6 +15,12 @@ use alloc::{boxed::Box, fmt::Debug, string::String, sync::Arc, vec::Vec}; use alloy_consensus::Header; use alloy_primitives::{BlockHash, BlockNumber, Bloom, B256}; use core::error::Error; + +/// Pre-computed receipt root and logs bloom. +/// +/// When provided to [`FullConsensus::validate_block_post_execution`], this allows skipping +/// the receipt root computation and using the pre-computed values instead. +pub type ReceiptRootBloom = (B256, Bloom); use reth_execution_types::BlockExecutionResult; use reth_primitives_traits::{ constants::{GAS_LIMIT_BOUND_DIVISOR, MAXIMUM_GAS_LIMIT_BLOCK, MINIMUM_GAS_LIMIT}, @@ -39,11 +45,15 @@ pub trait FullConsensus: Consensus { /// /// See the Yellow Paper sections 4.3.2 "Holistic Validity". /// + /// If `receipt_root_bloom` is provided, the implementation should use the pre-computed + /// receipt root and logs bloom instead of computing them from the receipts. + /// /// Note: validating blocks does not include other validations of the Consensus fn validate_block_post_execution( &self, block: &RecoveredBlock, result: &BlockExecutionResult, + receipt_root_bloom: Option, ) -> Result<(), ConsensusError>; } diff --git a/crates/consensus/consensus/src/noop.rs b/crates/consensus/consensus/src/noop.rs index 3e3341769d..08fe08e96e 100644 --- a/crates/consensus/consensus/src/noop.rs +++ b/crates/consensus/consensus/src/noop.rs @@ -18,7 +18,7 @@ //! //! **Not for production use** - provides no security guarantees or consensus validation. -use crate::{Consensus, ConsensusError, FullConsensus, HeaderValidator}; +use crate::{Consensus, ConsensusError, FullConsensus, HeaderValidator, ReceiptRootBloom}; use alloc::sync::Arc; use reth_execution_types::BlockExecutionResult; use reth_primitives_traits::{Block, NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader}; @@ -76,6 +76,7 @@ impl FullConsensus for NoopConsensus { &self, _block: &RecoveredBlock, _result: &BlockExecutionResult, + _receipt_root_bloom: Option, ) -> Result<(), ConsensusError> { Ok(()) } diff --git a/crates/consensus/consensus/src/test_utils.rs b/crates/consensus/consensus/src/test_utils.rs index 94a178abde..b2a1fc71f0 100644 --- a/crates/consensus/consensus/src/test_utils.rs +++ b/crates/consensus/consensus/src/test_utils.rs @@ -1,4 +1,4 @@ -use crate::{Consensus, ConsensusError, FullConsensus, HeaderValidator}; +use crate::{Consensus, ConsensusError, FullConsensus, HeaderValidator, ReceiptRootBloom}; use core::sync::atomic::{AtomicBool, Ordering}; use reth_execution_types::BlockExecutionResult; use reth_primitives_traits::{Block, NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader}; @@ -51,6 +51,7 @@ impl FullConsensus for TestConsensus { &self, _block: &RecoveredBlock, _result: &BlockExecutionResult, + _receipt_root_bloom: Option, ) -> Result<(), ConsensusError> { if self.fail_validation() { Err(ConsensusError::BaseFeeMissing) diff --git a/crates/engine/tree/Cargo.toml b/crates/engine/tree/Cargo.toml index 006233c190..50122c10ff 100644 --- a/crates/engine/tree/Cargo.toml +++ b/crates/engine/tree/Cargo.toml @@ -34,6 +34,7 @@ reth-trie-parallel.workspace = true reth-trie-sparse = { workspace = true, features = ["std", "metrics"] } reth-trie-sparse-parallel = { workspace = true, features = ["std"] } reth-trie.workspace = true +reth-trie-common.workspace = true reth-trie-db.workspace = true # alloy @@ -134,6 +135,7 @@ test-utils = [ "reth-static-file", "reth-tracing", "reth-trie/test-utils", + "reth-trie-common/test-utils", "reth-trie-db/test-utils", "reth-trie-sparse/test-utils", "reth-prune-types?/test-utils", diff --git a/crates/engine/tree/src/tree/metrics.rs b/crates/engine/tree/src/tree/metrics.rs index 0e9685c091..a11064ebd5 100644 --- a/crates/engine/tree/src/tree/metrics.rs +++ b/crates/engine/tree/src/tree/metrics.rs @@ -60,16 +60,22 @@ impl EngineApiMetrics { /// /// This method updates metrics for execution time, gas usage, and the number /// of accounts, storage slots and bytecodes loaded and updated. - pub(crate) fn execute_metered( + /// + /// The optional `on_receipt` callback is invoked after each transaction with the receipt + /// index and a reference to all receipts collected so far. This allows callers to stream + /// receipts to a background task for incremental receipt root computation. + pub(crate) fn execute_metered( &self, executor: E, mut transactions: impl Iterator, BlockExecutionError>>, transaction_count: usize, state_hook: Box, + mut on_receipt: F, ) -> Result<(BlockExecutionOutput, Vec
), BlockExecutionError> where DB: alloy_evm::Database, E: BlockExecutor>>, Transaction: SignedTransaction>, + F: FnMut(&[E::Receipt]), { // clone here is cheap, all the metrics are Option>. additionally // they are globally registered so that the data recorded in the hook will @@ -103,6 +109,9 @@ impl EngineApiMetrics { let gas_used = executor.execute_transaction(tx)?; self.executor.transaction_execution_histogram.record(start.elapsed()); + // Invoke callback with the latest receipt + on_receipt(executor.receipts()); + // record the tx gas used enter.record("gas_used", gas_used); } @@ -536,11 +545,12 @@ mod tests { let executor = MockExecutor::new(state); // This will fail to create the EVM but should still call the hook - let _result = metrics.execute_metered::<_, EmptyDB>( + let _result = metrics.execute_metered::<_, EmptyDB, _>( executor, input.clone_transactions_recovered().map(Ok::<_, BlockExecutionError>), input.transaction_count(), state_hook, + |_| {}, ); // Check if hook was called (it might not be if finish() fails early) @@ -595,11 +605,12 @@ mod tests { let executor = MockExecutor::new(state); // Execute (will fail but should still update some metrics) - let _result = metrics.execute_metered::<_, EmptyDB>( + let _result = metrics.execute_metered::<_, EmptyDB, _>( executor, input.clone_transactions_recovered().map(Ok::<_, BlockExecutionError>), input.transaction_count(), state_hook, + |_| {}, ); let snapshot = snapshotter.snapshot().into_vec(); diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index f2147ce93b..ed179afa8b 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -61,6 +61,7 @@ mod configured_sparse_trie; pub mod executor; pub mod multiproof; pub mod prewarm; +pub mod receipt_root_task; pub mod sparse_trie; use configured_sparse_trie::ConfiguredSparseTrie; diff --git a/crates/engine/tree/src/tree/payload_processor/receipt_root_task.rs b/crates/engine/tree/src/tree/payload_processor/receipt_root_task.rs new file mode 100644 index 0000000000..c9e53a11b0 --- /dev/null +++ b/crates/engine/tree/src/tree/payload_processor/receipt_root_task.rs @@ -0,0 +1,250 @@ +//! Receipt root computation in a background task. +//! +//! This module provides a streaming receipt root builder that computes the receipt trie root +//! in a background thread. Receipts are sent via a channel with their index, and for each +//! receipt received, the builder incrementally flushes leaves to the underlying +//! [`OrderedTrieRootEncodedBuilder`] when possible. When the channel closes, the task returns the +//! computed root. + +use alloy_eips::Encodable2718; +use alloy_primitives::{Bloom, B256}; +use crossbeam_channel::Receiver; +use reth_primitives_traits::Receipt; +use reth_trie_common::ordered_root::OrderedTrieRootEncodedBuilder; +use tokio::sync::oneshot; + +/// Receipt with index, ready to be sent to the background task for encoding and trie building. +#[derive(Debug, Clone)] +pub struct IndexedReceipt { + /// The transaction index within the block. + pub index: usize, + /// The receipt. + pub receipt: R, +} + +impl IndexedReceipt { + /// Creates a new indexed receipt. + #[inline] + pub const fn new(index: usize, receipt: R) -> Self { + Self { index, receipt } + } +} + +/// Handle for running the receipt root computation in a background task. +/// +/// This struct holds the channels needed to receive receipts and send the result. +/// Use [`Self::run`] to execute the computation (typically in a spawned blocking task). +#[derive(Debug)] +pub struct ReceiptRootTaskHandle { + /// Receiver for indexed receipts. + receipt_rx: Receiver>, + /// Sender for the computed result. + result_tx: oneshot::Sender<(B256, Bloom)>, +} + +impl ReceiptRootTaskHandle { + /// Creates a new handle from the receipt receiver and result sender channels. + pub const fn new( + receipt_rx: Receiver>, + result_tx: oneshot::Sender<(B256, Bloom)>, + ) -> Self { + Self { receipt_rx, result_tx } + } + + /// Runs the receipt root computation, consuming the handle. + /// + /// This method receives indexed receipts from the channel, encodes them, + /// and builds the trie incrementally. When all receipts have been received + /// (channel closed), it sends the result through the oneshot channel. + /// + /// This is designed to be called inside a blocking task (e.g., via + /// `executor.spawn_blocking(move || handle.run(receipts_len))`). + /// + /// # Arguments + /// + /// * `receipts_len` - The total number of receipts expected. This is needed to correctly order + /// the trie keys according to RLP encoding rules. + /// + /// # Panics + /// + /// Panics if the number of receipts received doesn't match `receipts_len`. + pub fn run(self, receipts_len: usize) { + let mut builder = OrderedTrieRootEncodedBuilder::new(receipts_len); + let mut aggregated_bloom = Bloom::ZERO; + let mut encode_buf = Vec::new(); + + for indexed_receipt in self.receipt_rx { + let receipt_with_bloom = indexed_receipt.receipt.with_bloom_ref(); + + encode_buf.clear(); + receipt_with_bloom.encode_2718(&mut encode_buf); + + aggregated_bloom |= *receipt_with_bloom.bloom_ref(); + builder.push_unchecked(indexed_receipt.index, &encode_buf); + } + + let root = builder.finalize().expect("receipt root builder incomplete"); + let _ = self.result_tx.send((root, aggregated_bloom)); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy_consensus::{proofs::calculate_receipt_root, TxReceipt}; + use alloy_primitives::{b256, hex, Address, Bytes, Log}; + use crossbeam_channel::bounded; + use reth_ethereum_primitives::{Receipt, TxType}; + + #[tokio::test] + async fn test_receipt_root_task_empty() { + let (_tx, rx) = bounded::>(1); + let (result_tx, result_rx) = oneshot::channel(); + drop(_tx); + + let handle = ReceiptRootTaskHandle::new(rx, result_tx); + tokio::task::spawn_blocking(move || handle.run(0)).await.unwrap(); + + let (root, bloom) = result_rx.await.unwrap(); + + // Empty trie root + assert_eq!(root, reth_trie_common::EMPTY_ROOT_HASH); + assert_eq!(bloom, Bloom::ZERO); + } + + #[tokio::test] + async fn test_receipt_root_task_single_receipt() { + let receipts: Vec = vec![Receipt::default()]; + + let (tx, rx) = bounded(1); + let (result_tx, result_rx) = oneshot::channel(); + let receipts_len = receipts.len(); + + let handle = ReceiptRootTaskHandle::new(rx, result_tx); + let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len)); + + for (i, receipt) in receipts.clone().into_iter().enumerate() { + tx.send(IndexedReceipt::new(i, receipt)).unwrap(); + } + drop(tx); + + join_handle.await.unwrap(); + let (root, _bloom) = result_rx.await.unwrap(); + + // Verify against the standard calculation + let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect(); + let expected_root = calculate_receipt_root(&receipts_with_bloom); + + assert_eq!(root, expected_root); + } + + #[tokio::test] + async fn test_receipt_root_task_multiple_receipts() { + let receipts: Vec = vec![Receipt::default(); 5]; + + let (tx, rx) = bounded(4); + let (result_tx, result_rx) = oneshot::channel(); + let receipts_len = receipts.len(); + + let handle = ReceiptRootTaskHandle::new(rx, result_tx); + let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len)); + + for (i, receipt) in receipts.into_iter().enumerate() { + tx.send(IndexedReceipt::new(i, receipt)).unwrap(); + } + drop(tx); + + join_handle.await.unwrap(); + let (root, bloom) = result_rx.await.unwrap(); + + // Verify against expected values from existing test + assert_eq!( + root, + b256!("0x61353b4fb714dc1fccacbf7eafc4273e62f3d1eed716fe41b2a0cd2e12c63ebc") + ); + assert_eq!( + bloom, + Bloom::from(hex!("00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")) + ); + } + + #[tokio::test] + async fn test_receipt_root_matches_standard_calculation() { + // Create some receipts with actual data + let receipts = vec![ + Receipt { + tx_type: TxType::Legacy, + cumulative_gas_used: 21000, + success: true, + logs: vec![], + }, + Receipt { + tx_type: TxType::Eip1559, + cumulative_gas_used: 42000, + success: true, + logs: vec![Log { + address: Address::ZERO, + data: alloy_primitives::LogData::new_unchecked(vec![B256::ZERO], Bytes::new()), + }], + }, + Receipt { + tx_type: TxType::Eip2930, + cumulative_gas_used: 63000, + success: false, + logs: vec![], + }, + ]; + + // Calculate expected values first (before we move receipts) + let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect(); + let expected_root = calculate_receipt_root(&receipts_with_bloom); + let expected_bloom = + receipts_with_bloom.iter().fold(Bloom::ZERO, |bloom, r| bloom | r.bloom_ref()); + + // Calculate using the task + let (tx, rx) = bounded(4); + let (result_tx, result_rx) = oneshot::channel(); + let receipts_len = receipts.len(); + + let handle = ReceiptRootTaskHandle::new(rx, result_tx); + let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len)); + + for (i, receipt) in receipts.into_iter().enumerate() { + tx.send(IndexedReceipt::new(i, receipt)).unwrap(); + } + drop(tx); + + join_handle.await.unwrap(); + let (task_root, task_bloom) = result_rx.await.unwrap(); + + assert_eq!(task_root, expected_root); + assert_eq!(task_bloom, expected_bloom); + } + + #[tokio::test] + async fn test_receipt_root_task_out_of_order() { + let receipts: Vec = vec![Receipt::default(); 5]; + + // Calculate expected values first (before we move receipts) + let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect(); + let expected_root = calculate_receipt_root(&receipts_with_bloom); + + let (tx, rx) = bounded(4); + let (result_tx, result_rx) = oneshot::channel(); + let receipts_len = receipts.len(); + + let handle = ReceiptRootTaskHandle::new(rx, result_tx); + let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len)); + + // Send in reverse order to test out-of-order handling + for (i, receipt) in receipts.into_iter().enumerate().rev() { + tx.send(IndexedReceipt::new(i, receipt)).unwrap(); + } + drop(tx); + + join_handle.await.unwrap(); + let (root, _bloom) = result_rx.await.unwrap(); + + assert_eq!(root, expected_root); + } +} diff --git a/crates/engine/tree/src/tree/payload_validator.rs b/crates/engine/tree/src/tree/payload_validator.rs index 6575463f0c..573962a8e5 100644 --- a/crates/engine/tree/src/tree/payload_validator.rs +++ b/crates/engine/tree/src/tree/payload_validator.rs @@ -15,9 +15,11 @@ use alloy_eip7928::BlockAccessList; use alloy_eips::{eip1898::BlockWithParent, NumHash}; use alloy_evm::Evm; use alloy_primitives::B256; + +use crate::tree::payload_processor::receipt_root_task::{IndexedReceipt, ReceiptRootTaskHandle}; use rayon::prelude::*; use reth_chain_state::{CanonicalInMemoryState, DeferredTrieData, ExecutedBlock, LazyOverlay}; -use reth_consensus::{ConsensusError, FullConsensus}; +use reth_consensus::{ConsensusError, FullConsensus, ReceiptRootBloom}; use reth_engine_primitives::{ ConfigureEngineEvm, ExecutableTxIterator, ExecutionPayload, InvalidBlockHook, PayloadValidator, }; @@ -453,11 +455,14 @@ where state_provider = Box::new(InstrumentedStateProvider::new(state_provider, "engine")); } - // Execute the block and handle any execution errors - let (output, senders) = match self.execute_block(state_provider, env, &input, &mut handle) { - Ok(output) => output, - Err(err) => return self.handle_execution_error(input, err, &parent_block), - }; + // Execute the block and handle any execution errors. + // The receipt root task is spawned before execution and receives receipts incrementally + // as transactions complete, allowing parallel computation during execution. + let (output, senders, receipt_root_rx) = + match self.execute_block(state_provider, env, &input, &mut handle) { + Ok(output) => output, + Err(err) => return self.handle_execution_error(input, err, &parent_block), + }; // After executing the block we can stop prewarming transactions handle.stop_prewarming_execution(); @@ -473,8 +478,21 @@ where let block = self.convert_to_block(input)?.with_senders(senders); + // Wait for the receipt root computation to complete. + let receipt_root_bloom = Some( + receipt_root_rx + .blocking_recv() + .expect("receipt root task dropped sender without result"), + ); + let hashed_state = ensure_ok_post_block!( - self.validate_post_execution(&block, &parent_block, &output, &mut ctx), + self.validate_post_execution( + &block, + &parent_block, + &output, + &mut ctx, + receipt_root_bloom + ), block ); @@ -622,13 +640,21 @@ where /// Executes a block with the given state provider #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)] + #[expect(clippy::type_complexity)] fn execute_block( &mut self, state_provider: S, env: ExecutionEnv, input: &BlockOrPayload, handle: &mut PayloadHandle, Err, N::Receipt>, - ) -> Result<(BlockExecutionOutput, Vec
), InsertBlockErrorKind> + ) -> Result< + ( + BlockExecutionOutput, + Vec
, + tokio::sync::oneshot::Receiver<(B256, alloy_primitives::Bloom)>, + ), + InsertBlockErrorKind, + > where S: StateProvider + Send, Err: core::error::Error + Send + Sync + 'static, @@ -667,6 +693,14 @@ where }); } + // Spawn background task to compute receipt root and logs bloom incrementally. + // Unbounded channel is used since tx count bounds capacity anyway (max ~30k txs per block). + let receipts_len = input.transaction_count(); + let (receipt_tx, receipt_rx) = crossbeam_channel::unbounded(); + let (result_tx, result_rx) = tokio::sync::oneshot::channel(); + let task_handle = ReceiptRootTaskHandle::new(receipt_rx, result_tx); + self.payload_processor.executor().spawn_blocking(move || task_handle.run(receipts_len)); + let execution_start = Instant::now(); let state_hook = Box::new(handle.state_hook()); let (output, senders) = self.metrics.execute_metered( @@ -674,11 +708,22 @@ where handle.iter_transactions().map(|res| res.map_err(BlockExecutionError::other)), input.transaction_count(), state_hook, + |receipts| { + // Send the latest receipt to the background task for incremental root computation. + // The receipt is cloned here; encoding happens in the background thread. + if let Some(receipt) = receipts.last() { + // Infer tx_index from the number of receipts collected so far + let tx_index = receipts.len() - 1; + let _ = receipt_tx.send(IndexedReceipt::new(tx_index, receipt.clone())); + } + }, )?; + drop(receipt_tx); + let execution_finish = Instant::now(); let execution_time = execution_finish.duration_since(execution_start); debug!(target: "engine::tree::payload_validator", elapsed = ?execution_time, "Executed block"); - Ok((output, senders)) + Ok((output, senders, result_rx)) } /// Compute state root for the given hashed post state in parallel. @@ -736,6 +781,9 @@ where /// - parent header validation /// - post-execution consensus validation /// - state-root based post-execution validation + /// + /// If `receipt_root_bloom` is provided, it will be used instead of computing the receipt root + /// and logs bloom from the receipts. #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)] fn validate_post_execution>>( &self, @@ -743,6 +791,7 @@ where parent_block: &SealedHeader, output: &BlockExecutionOutput, ctx: &mut TreeCtx<'_, N>, + receipt_root_bloom: Option, ) -> Result where V: PayloadValidator, @@ -769,7 +818,9 @@ where let _enter = debug_span!(target: "engine::tree::payload_validator", "validate_block_post_execution") .entered(); - if let Err(err) = self.consensus.validate_block_post_execution(block, output) { + if let Err(err) = + self.consensus.validate_block_post_execution(block, output, receipt_root_bloom) + { // call post-block hook self.on_invalid_block(parent_block, block, output, None, ctx.state_mut()); return Err(err.into()) diff --git a/crates/ethereum/consensus/src/lib.rs b/crates/ethereum/consensus/src/lib.rs index ccbbb036ff..fec4f21b9f 100644 --- a/crates/ethereum/consensus/src/lib.rs +++ b/crates/ethereum/consensus/src/lib.rs @@ -15,7 +15,7 @@ use alloc::{fmt::Debug, sync::Arc}; use alloy_consensus::{constants::MAXIMUM_EXTRA_DATA_SIZE, EMPTY_OMMER_ROOT_HASH}; use alloy_eips::eip7840::BlobParams; use reth_chainspec::{EthChainSpec, EthereumHardforks}; -use reth_consensus::{Consensus, ConsensusError, FullConsensus, HeaderValidator}; +use reth_consensus::{Consensus, ConsensusError, FullConsensus, HeaderValidator, ReceiptRootBloom}; use reth_consensus_common::validation::{ validate_4844_header_standalone, validate_against_parent_4844, validate_against_parent_eip1559_base_fee, validate_against_parent_gas_limit, @@ -74,8 +74,15 @@ where &self, block: &RecoveredBlock, result: &BlockExecutionResult, + receipt_root_bloom: Option, ) -> Result<(), ConsensusError> { - validate_block_post_execution(block, &self.chain_spec, &result.receipts, &result.requests) + validate_block_post_execution( + block, + &self.chain_spec, + &result.receipts, + &result.requests, + receipt_root_bloom, + ) } } diff --git a/crates/ethereum/consensus/src/validation.rs b/crates/ethereum/consensus/src/validation.rs index 055977b517..693d6ce002 100644 --- a/crates/ethereum/consensus/src/validation.rs +++ b/crates/ethereum/consensus/src/validation.rs @@ -12,11 +12,15 @@ use reth_primitives_traits::{ /// /// - Compares the receipts root in the block header to the block body /// - Compares the gas used in the block header to the actual gas usage after execution +/// +/// If `receipt_root_bloom` is provided, the pre-computed receipt root and logs bloom are used +/// instead of computing them from the receipts. pub fn validate_block_post_execution( block: &RecoveredBlock, chain_spec: &ChainSpec, receipts: &[R], requests: &Requests, + receipt_root_bloom: Option<(B256, Bloom)>, ) -> Result<(), ConsensusError> where B: Block, @@ -37,19 +41,26 @@ where // operation as hashing that is required for state root got calculated in every // transaction This was replaced with is_success flag. // See more about EIP here: https://eips.ethereum.org/EIPS/eip-658 - if chain_spec.is_byzantium_active_at_block(block.header().number()) && - let Err(error) = verify_receipts( - block.header().receipts_root(), - block.header().logs_bloom(), - receipts, - ) - { - let receipts = receipts - .iter() - .map(|r| Bytes::from(r.with_bloom_ref().encoded_2718())) - .collect::>(); - tracing::debug!(%error, ?receipts, "receipts verification failed"); - return Err(error) + if chain_spec.is_byzantium_active_at_block(block.header().number()) { + let result = if let Some((receipts_root, logs_bloom)) = receipt_root_bloom { + compare_receipts_root_and_logs_bloom( + receipts_root, + logs_bloom, + block.header().receipts_root(), + block.header().logs_bloom(), + ) + } else { + verify_receipts(block.header().receipts_root(), block.header().logs_bloom(), receipts) + }; + + if let Err(error) = result { + let receipts = receipts + .iter() + .map(|r| Bytes::from(r.with_bloom_ref().encoded_2718())) + .collect::>(); + tracing::debug!(%error, ?receipts, "receipts verification failed"); + return Err(error) + } } // Validate that the header requests hash matches the calculated requests hash diff --git a/crates/optimism/consensus/src/lib.rs b/crates/optimism/consensus/src/lib.rs index 8be804db45..1d3cb421c4 100644 --- a/crates/optimism/consensus/src/lib.rs +++ b/crates/optimism/consensus/src/lib.rs @@ -18,7 +18,7 @@ use alloy_consensus::{ use alloy_primitives::B64; use core::fmt::Debug; use reth_chainspec::EthChainSpec; -use reth_consensus::{Consensus, ConsensusError, FullConsensus, HeaderValidator}; +use reth_consensus::{Consensus, ConsensusError, FullConsensus, HeaderValidator, ReceiptRootBloom}; use reth_consensus_common::validation::{ validate_against_parent_eip1559_base_fee, validate_against_parent_hash_number, validate_against_parent_timestamp, validate_cancun_gas, validate_header_base_fee, @@ -79,8 +79,9 @@ where &self, block: &RecoveredBlock, result: &BlockExecutionResult, + receipt_root_bloom: Option, ) -> Result<(), ConsensusError> { - validate_block_post_execution(block.header(), &self.chain_spec, result) + validate_block_post_execution(block.header(), &self.chain_spec, result, receipt_root_bloom) } } @@ -410,7 +411,8 @@ mod tests { let post_execution = as FullConsensus>::validate_block_post_execution( &beacon_consensus, &block, - &result + &result, + None, ); // validate blob, it should pass blob gas used validation @@ -479,7 +481,8 @@ mod tests { let post_execution = as FullConsensus>::validate_block_post_execution( &beacon_consensus, &block, - &result + &result, + None, ); // validate blob, it should fail blob gas used validation post execution. diff --git a/crates/optimism/consensus/src/validation/mod.rs b/crates/optimism/consensus/src/validation/mod.rs index 50c45f7172..2168548608 100644 --- a/crates/optimism/consensus/src/validation/mod.rs +++ b/crates/optimism/consensus/src/validation/mod.rs @@ -85,10 +85,14 @@ where /// /// - Compares the receipts root in the block header to the block body /// - Compares the gas used in the block header to the actual gas usage after execution +/// +/// If `receipt_root_bloom` is provided, the pre-computed receipt root and logs bloom are used +/// instead of computing them from the receipts. pub fn validate_block_post_execution( header: impl BlockHeader, chain_spec: impl OpHardforks, result: &BlockExecutionResult, + receipt_root_bloom: Option<(B256, Bloom)>, ) -> Result<(), ConsensusError> { // Validate that the blob gas used is present and correctly computed if Jovian is active. if chain_spec.is_jovian_active_at_timestamp(header.timestamp()) { @@ -110,21 +114,32 @@ pub fn validate_block_post_execution( // operation as hashing that is required for state root got calculated in every // transaction This was replaced with is_success flag. // See more about EIP here: https://eips.ethereum.org/EIPS/eip-658 - if chain_spec.is_byzantium_active_at_block(header.number()) && - let Err(error) = verify_receipts_optimism( - header.receipts_root(), - header.logs_bloom(), - receipts, - chain_spec, - header.timestamp(), - ) - { - let receipts = receipts - .iter() - .map(|r| Bytes::from(r.with_bloom_ref().encoded_2718())) - .collect::>(); - tracing::debug!(%error, ?receipts, "receipts verification failed"); - return Err(error) + if chain_spec.is_byzantium_active_at_block(header.number()) { + let result = if let Some((receipts_root, logs_bloom)) = receipt_root_bloom { + compare_receipts_root_and_logs_bloom( + receipts_root, + logs_bloom, + header.receipts_root(), + header.logs_bloom(), + ) + } else { + verify_receipts_optimism( + header.receipts_root(), + header.logs_bloom(), + receipts, + chain_spec, + header.timestamp(), + ) + }; + + if let Err(error) = result { + let receipts = receipts + .iter() + .map(|r| Bytes::from(r.with_bloom_ref().encoded_2718())) + .collect::>(); + tracing::debug!(%error, ?receipts, "receipts verification failed"); + return Err(error) + } } // Check if gas used matches the value set in header. @@ -543,7 +558,7 @@ mod tests { requests: Requests::default(), gas_used: GAS_USED, }; - validate_block_post_execution(&header, &chainspec, &result).unwrap(); + validate_block_post_execution(&header, &chainspec, &result, None).unwrap(); } #[test] @@ -565,7 +580,7 @@ mod tests { gas_used: GAS_USED, }; assert!(matches!( - validate_block_post_execution(&header, &chainspec, &result).unwrap_err(), + validate_block_post_execution(&header, &chainspec, &result, None).unwrap_err(), ConsensusError::BlobGasUsedDiff(diff) if diff.got == BLOB_GAS_USED && diff.expected == BLOB_GAS_USED + 1 )); diff --git a/crates/rpc/rpc/src/validation.rs b/crates/rpc/rpc/src/validation.rs index 78185f92ff..73b2c68e2b 100644 --- a/crates/rpc/rpc/src/validation.rs +++ b/crates/rpc/rpc/src/validation.rs @@ -201,7 +201,7 @@ where // update the cached reads self.update_cached_reads(parent_header_hash, request_cache).await; - self.consensus.validate_block_post_execution(&block, &output)?; + self.consensus.validate_block_post_execution(&block, &output, None)?; self.ensure_payment(&block, &output, &message)?; diff --git a/crates/stages/stages/src/stages/execution.rs b/crates/stages/stages/src/stages/execution.rs index 593180926d..29adf3b2d3 100644 --- a/crates/stages/stages/src/stages/execution.rs +++ b/crates/stages/stages/src/stages/execution.rs @@ -351,7 +351,7 @@ where }) })?; - if let Err(err) = self.consensus.validate_block_post_execution(&block, &result) { + if let Err(err) = self.consensus.validate_block_post_execution(&block, &result, None) { return Err(StageError::Block { block: Box::new(block.block_with_parent()), error: BlockErrorKind::Validation(err), diff --git a/crates/stateless/src/validation.rs b/crates/stateless/src/validation.rs index 08d84f8466..a3a8ba7b2d 100644 --- a/crates/stateless/src/validation.rs +++ b/crates/stateless/src/validation.rs @@ -231,8 +231,14 @@ where .map_err(|e| StatelessValidationError::StatelessExecutionFailed(e.to_string()))?; // Post validation checks - validate_block_post_execution(¤t_block, &chain_spec, &output.receipts, &output.requests) - .map_err(StatelessValidationError::ConsensusValidationFailed)?; + validate_block_post_execution( + ¤t_block, + &chain_spec, + &output.receipts, + &output.requests, + None, + ) + .map_err(StatelessValidationError::ConsensusValidationFailed)?; // Compute and check the post state root let hashed_state = HashedPostState::from_bundle_state::(&output.state.state); diff --git a/crates/trie/common/src/lib.rs b/crates/trie/common/src/lib.rs index b478a126a5..8faa44622f 100644 --- a/crates/trie/common/src/lib.rs +++ b/crates/trie/common/src/lib.rs @@ -55,6 +55,9 @@ pub use proofs::*; pub mod root; +/// Incremental ordered trie root computation. +pub mod ordered_root; + /// Buffer for trie updates. pub mod updates; diff --git a/crates/trie/common/src/ordered_root.rs b/crates/trie/common/src/ordered_root.rs new file mode 100644 index 0000000000..ed02a369f7 --- /dev/null +++ b/crates/trie/common/src/ordered_root.rs @@ -0,0 +1,354 @@ +//! Incremental ordered trie root computation. +//! +//! This module provides builders for computing ordered trie roots incrementally as items +//! arrive, rather than requiring all items upfront. This is useful for receipt root +//! calculation during block execution, where we know the total count but receive receipts +//! one by one as transactions are executed. + +use crate::{HashBuilder, Nibbles, EMPTY_ROOT_HASH}; +use alloc::vec::Vec; +use alloy_primitives::B256; +use alloy_trie::root::adjust_index_for_rlp; +use core::fmt; + +/// Error returned when using [`OrderedTrieRootEncodedBuilder`]. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum OrderedRootError { + /// Called `finalize()` before all items were pushed. + Incomplete { + /// The expected number of items. + expected: usize, + /// The number of items received. + received: usize, + }, + /// Index is out of bounds. + IndexOutOfBounds { + /// The index that was provided. + index: usize, + /// The expected length. + len: usize, + }, + /// Item at this index was already pushed. + DuplicateIndex { + /// The duplicate index. + index: usize, + }, +} + +impl OrderedRootError { + /// Returns `true` if the error is [`OrderedRootError::Incomplete`]. + #[inline] + pub const fn is_incomplete(&self) -> bool { + matches!(self, Self::Incomplete { .. }) + } + + /// Returns `true` if the error is [`OrderedRootError::IndexOutOfBounds`]. + #[inline] + pub const fn is_index_out_of_bounds(&self) -> bool { + matches!(self, Self::IndexOutOfBounds { .. }) + } + + /// Returns `true` if the error is [`OrderedRootError::DuplicateIndex`]. + #[inline] + pub const fn is_duplicate_index(&self) -> bool { + matches!(self, Self::DuplicateIndex { .. }) + } + + /// Returns the index associated with the error, if any. + #[inline] + pub const fn index(&self) -> Option { + match self { + Self::Incomplete { .. } => None, + Self::IndexOutOfBounds { index, .. } | Self::DuplicateIndex { index } => Some(*index), + } + } +} + +impl fmt::Display for OrderedRootError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Incomplete { expected, received } => { + write!(f, "incomplete: expected {expected} items, received {received}") + } + Self::IndexOutOfBounds { index, len } => { + write!(f, "index {index} out of bounds for length {len}") + } + Self::DuplicateIndex { index } => { + write!(f, "duplicate item at index {index}") + } + } + } +} + +#[cfg(feature = "std")] +impl std::error::Error for OrderedRootError {} + +/// A builder for computing ordered trie roots incrementally from pre-encoded items. +/// +/// This builder allows pushing items one by one as they become available +/// (e.g., receipts after each transaction execution), rather than requiring +/// all items upfront. +/// +/// # Use Case +/// +/// When executing a block, the receipt root must be computed from all transaction +/// receipts. With the standard `ordered_trie_root`, you must wait until all +/// transactions are executed before computing the root. This builder enables +/// **incremental computation** - you can start building the trie as soon as +/// receipts become available, potentially in parallel with continued execution. +/// +/// The builder requires knowing the total item count upfront (the number of +/// transactions in the block), but items can be pushed in any order by index. +/// +/// # How It Works +/// +/// Items can be pushed in any order by specifying their index. The builder +/// internally buffers items and flushes them to the underlying [`HashBuilder`] +/// in the correct order for RLP key encoding (as determined by [`adjust_index_for_rlp`]). +/// +/// # Memory +/// +/// Each pushed item is stored in an internal buffer until it can be flushed. +/// In the worst case (e.g., pushing index 0 last), all items except one will +/// be buffered. For receipt roots, index 0 is typically flushed late due to +/// RLP key ordering, so expect to buffer most items until near the end. +/// +/// # Example +/// +/// ``` +/// use reth_trie_common::ordered_root::OrderedTrieRootEncodedBuilder; +/// +/// // Create a builder for 2 pre-encoded items +/// let mut builder = OrderedTrieRootEncodedBuilder::new(2); +/// +/// // Push pre-encoded items as they arrive (can be out of order) +/// builder.push(1, b"encoded_item_1").unwrap(); +/// builder.push(0, b"encoded_item_0").unwrap(); +/// +/// // Finalize to get the root hash +/// let root = builder.finalize().unwrap(); +/// ``` +#[derive(Debug)] +pub struct OrderedTrieRootEncodedBuilder { + /// Total expected number of items. + len: usize, + /// Number of items received so far. + received: usize, + /// Next insertion loop counter (determines which adjusted index to flush next). + next_insert_i: usize, + /// Buffer for pending items, indexed by execution index. + pending: Vec>>, + /// The underlying hash builder. + hb: HashBuilder, +} + +impl OrderedTrieRootEncodedBuilder { + /// Creates a new builder for `len` pre-encoded items. + pub fn new(len: usize) -> Self { + Self { + len, + received: 0, + next_insert_i: 0, + pending: alloc::vec![None; len], + hb: HashBuilder::default(), + } + } + + /// Pushes a pre-encoded item at the given index to the builder. + /// + /// Items can be pushed in any order. The builder will automatically + /// flush items to the underlying [`HashBuilder`] when they become + /// available in the correct order. + /// + /// # Errors + /// + /// - [`OrderedRootError::IndexOutOfBounds`] if `index >= len` + /// - [`OrderedRootError::DuplicateIndex`] if an item was already pushed at this index + #[inline] + pub fn push(&mut self, index: usize, bytes: &[u8]) -> Result<(), OrderedRootError> { + if index >= self.len { + return Err(OrderedRootError::IndexOutOfBounds { index, len: self.len }); + } + + if self.pending[index].is_some() { + return Err(OrderedRootError::DuplicateIndex { index }); + } + + self.push_unchecked(index, bytes); + Ok(()) + } + + /// Pushes a pre-encoded item at the given index without bounds or duplicate checking. + /// + /// This is a performance-critical method for callers that can guarantee: + /// - `index < len` + /// - No item has been pushed at this index before + /// + /// # Panics + /// + /// Panics in debug mode if `index >= len`. + #[inline] + pub fn push_unchecked(&mut self, index: usize, bytes: &[u8]) { + debug_assert!(index < self.len, "index {index} out of bounds for length {}", self.len); + debug_assert!(self.pending[index].is_none(), "duplicate item at index {index}"); + + self.pending[index] = Some(bytes.to_vec()); + self.received += 1; + + self.flush(); + } + + /// Attempts to flush pending items to the hash builder. + fn flush(&mut self) { + while self.next_insert_i < self.len { + let exec_index_needed = adjust_index_for_rlp(self.next_insert_i, self.len); + + let Some(value) = self.pending[exec_index_needed].take() else { + break; + }; + + let index_buffer = alloy_rlp::encode_fixed_size(&exec_index_needed); + self.hb.add_leaf(Nibbles::unpack(&index_buffer), &value); + + self.next_insert_i += 1; + } + } + + /// Returns `true` if all items have been pushed. + #[inline] + pub const fn is_complete(&self) -> bool { + self.received == self.len + } + + /// Returns the number of items pushed so far. + #[inline] + pub const fn pushed_count(&self) -> usize { + self.received + } + + /// Returns the expected total number of items. + #[inline] + pub const fn expected_count(&self) -> usize { + self.len + } + + /// Finalizes the builder and returns the trie root. + /// + /// # Errors + /// + /// Returns [`OrderedRootError::Incomplete`] if not all items have been pushed. + pub fn finalize(mut self) -> Result { + if self.len == 0 { + return Ok(EMPTY_ROOT_HASH); + } + + if self.received != self.len { + return Err(OrderedRootError::Incomplete { + expected: self.len, + received: self.received, + }); + } + + debug_assert_eq!(self.next_insert_i, self.len, "not all items were flushed"); + + Ok(self.hb.root()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy_trie::root::ordered_trie_root_encoded; + + #[test] + fn test_ordered_encoded_builder_equivalence() { + for len in [0, 1, 2, 3, 10, 127, 128, 129, 130, 200] { + let items: Vec> = + (0..len).map(|i| format!("item_{i}_data").into_bytes()).collect(); + + let expected = ordered_trie_root_encoded(&items); + + let mut builder = OrderedTrieRootEncodedBuilder::new(len); + + for (i, item) in items.iter().enumerate() { + builder.push(i, item).unwrap(); + } + + let actual = builder.finalize().unwrap(); + assert_eq!( + expected, actual, + "mismatch for len={len}: expected {expected:?}, got {actual:?}" + ); + } + } + + #[test] + fn test_ordered_builder_out_of_order() { + for len in [2, 3, 5, 10, 50] { + let items: Vec> = + (0..len).map(|i| format!("item_{i}_data").into_bytes()).collect(); + + let expected = ordered_trie_root_encoded(&items); + + // Push in reverse order + let mut builder = OrderedTrieRootEncodedBuilder::new(len); + for i in (0..len).rev() { + builder.push(i, &items[i]).unwrap(); + } + let actual = builder.finalize().unwrap(); + assert_eq!(expected, actual, "mismatch for reverse order len={len}"); + + // Push odds first, then evens + let mut builder = OrderedTrieRootEncodedBuilder::new(len); + for i in (1..len).step_by(2) { + builder.push(i, &items[i]).unwrap(); + } + for i in (0..len).step_by(2) { + builder.push(i, &items[i]).unwrap(); + } + let actual = builder.finalize().unwrap(); + assert_eq!(expected, actual, "mismatch for odd/even order len={len}"); + } + } + + #[test] + fn test_ordered_builder_empty() { + let builder = OrderedTrieRootEncodedBuilder::new(0); + assert!(builder.is_complete()); + assert_eq!(builder.finalize().unwrap(), EMPTY_ROOT_HASH); + } + + #[test] + fn test_ordered_builder_incomplete_error() { + let mut builder = OrderedTrieRootEncodedBuilder::new(3); + + builder.push(0, b"item_0").unwrap(); + builder.push(1, b"item_1").unwrap(); + + assert!(!builder.is_complete()); + assert_eq!( + builder.finalize(), + Err(OrderedRootError::Incomplete { expected: 3, received: 2 }) + ); + } + + #[test] + fn test_ordered_builder_index_errors() { + let mut builder = OrderedTrieRootEncodedBuilder::new(2); + + assert_eq!( + builder.push(5, b"item"), + Err(OrderedRootError::IndexOutOfBounds { index: 5, len: 2 }) + ); + + builder.push(0, b"item_0").unwrap(); + + assert_eq!( + builder.push(0, b"item_0_dup"), + Err(OrderedRootError::DuplicateIndex { index: 0 }) + ); + + builder.push(1, b"item_1").unwrap(); + assert!(builder.is_complete()); + } +} diff --git a/testing/ef-tests/src/cases/blockchain_test.rs b/testing/ef-tests/src/cases/blockchain_test.rs index 6d8dbc6827..8f26ac8dd0 100644 --- a/testing/ef-tests/src/cases/blockchain_test.rs +++ b/testing/ef-tests/src/cases/blockchain_test.rs @@ -277,7 +277,7 @@ fn run_case( .map_err(|err| Error::block_failed(block_number, program_inputs.clone(), err))?; // Consensus checks after block execution - validate_block_post_execution(block, &chain_spec, &output.receipts, &output.requests) + validate_block_post_execution(block, &chain_spec, &output.receipts, &output.requests, None) .map_err(|err| Error::block_failed(block_number, program_inputs.clone(), err))?; // Generate the stateless witness