mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
aab08b82ea | ||
|
|
6d357545ef | ||
|
|
2740793e42 |
@@ -236,36 +236,33 @@ where
|
||||
|
||||
if let Some(saved_cache) = saved_cache {
|
||||
debug!(target: "engine::caching", parent_hash=?hash, "Updating execution cache");
|
||||
// Perform all cache operations atomically under the lock
|
||||
|
||||
// Detach the published cache so readers see None during the update.
|
||||
// This is necessary because ExecutionCache is Arc-shared: mutating
|
||||
// it via insert_state would be visible through the old SavedCache.
|
||||
execution_cache.update_with_guard(|cached| {
|
||||
// consumes the `SavedCache` held by the prewarming task, which releases its usage
|
||||
// guard
|
||||
let (caches, cache_metrics, disable_cache_metrics) = saved_cache.split();
|
||||
let new_cache = SavedCache::new(hash, caches, cache_metrics)
|
||||
.with_disable_cache_metrics(disable_cache_metrics);
|
||||
cached.take();
|
||||
});
|
||||
|
||||
// Insert state into cache while holding the lock
|
||||
// Access the BundleState through the shared ExecutionOutcome
|
||||
if new_cache.cache().insert_state(&execution_outcome.state).is_err() {
|
||||
// Clear the cache on error to prevent having a polluted cache
|
||||
*cached = None;
|
||||
debug!(target: "engine::caching", "cleared execution cache on update error");
|
||||
return;
|
||||
}
|
||||
let (caches, cache_metrics, disable_cache_metrics) = saved_cache.split();
|
||||
let new_cache = SavedCache::new(hash, caches, cache_metrics)
|
||||
.with_disable_cache_metrics(disable_cache_metrics);
|
||||
|
||||
if new_cache.cache().insert_state(&execution_outcome.state).is_err() {
|
||||
debug!(target: "engine::caching", "cleared execution cache on update error");
|
||||
} else {
|
||||
new_cache.update_metrics();
|
||||
|
||||
if valid_block_rx.recv().is_ok() {
|
||||
// Replace the shared cache with the new one; the previous cache (if any) is
|
||||
// dropped.
|
||||
*cached = Some(new_cache);
|
||||
} else {
|
||||
// Block was invalid; caches were already mutated by insert_state above,
|
||||
// so we must clear to prevent using polluted state
|
||||
*cached = None;
|
||||
debug!(target: "engine::caching", "cleared execution cache on invalid block");
|
||||
}
|
||||
});
|
||||
let valid = valid_block_rx.recv().is_ok();
|
||||
|
||||
execution_cache.update_with_guard(|cached| {
|
||||
if valid {
|
||||
*cached = Some(new_cache);
|
||||
} else {
|
||||
debug!(target: "engine::caching", "cleared execution cache on invalid block");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
debug!(target: "engine::caching", parent_hash=?hash, elapsed=?elapsed, "Updated execution cache");
|
||||
|
||||
@@ -10,11 +10,11 @@ use crate::tree::{
|
||||
EngineApiMetrics, EngineApiTreeState, ExecutionEnv, PayloadHandle, StateProviderBuilder,
|
||||
StateProviderDatabase, TreeConfig,
|
||||
};
|
||||
use alloy_consensus::transaction::{Either, TxHashRef};
|
||||
use alloy_consensus::{proofs::calculate_receipt_root, transaction::Either, TxHashRef, TxReceipt};
|
||||
use alloy_eip7928::BlockAccessList;
|
||||
use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal, NumHash};
|
||||
use alloy_evm::Evm;
|
||||
use alloy_primitives::B256;
|
||||
use alloy_primitives::{Bloom, B256};
|
||||
|
||||
use crate::tree::payload_processor::receipt_root_task::{IndexedReceipt, ReceiptRootTaskHandle};
|
||||
use reth_chain_state::{CanonicalInMemoryState, DeferredTrieData, ExecutedBlock, LazyOverlay};
|
||||
@@ -53,6 +53,16 @@ use std::{
|
||||
};
|
||||
use tracing::{debug, debug_span, error, info, instrument, trace, warn};
|
||||
|
||||
/// Blocks with at most this many transactions compute the receipt root inline to avoid
|
||||
/// background task overhead.
|
||||
const SMALL_BLOCK_RECEIPT_ROOT_TX_THRESHOLD: usize = 50;
|
||||
const SMALL_BLOCK_STATE_ROOT_TX_THRESHOLD: usize = 50;
|
||||
|
||||
enum ReceiptRootResult {
|
||||
Precomputed(ReceiptRootBloom),
|
||||
Pending(tokio::sync::oneshot::Receiver<ReceiptRootBloom>),
|
||||
}
|
||||
|
||||
/// Context providing access to tree state during validation.
|
||||
///
|
||||
/// This context is provided to the [`EngineValidator`] and includes the state of the tree's
|
||||
@@ -290,7 +300,7 @@ where
|
||||
// Validate block consensus rules which includes header validation
|
||||
if let Err(consensus_err) = self.validate_block_inner(&block) {
|
||||
// Header validation error takes precedence over execution error
|
||||
return Err(InsertBlockError::new(block, consensus_err.into()).into())
|
||||
return Err(InsertBlockError::new(block, consensus_err.into()).into());
|
||||
}
|
||||
|
||||
// Also validate against the parent
|
||||
@@ -298,7 +308,7 @@ where
|
||||
self.consensus.validate_header_against_parent(block.sealed_header(), parent_block)
|
||||
{
|
||||
// Parent validation error takes precedence over execution error
|
||||
return Err(InsertBlockError::new(block, consensus_err.into()).into())
|
||||
return Err(InsertBlockError::new(block, consensus_err.into()).into());
|
||||
}
|
||||
|
||||
// No header validation errors, return the original execution error
|
||||
@@ -338,7 +348,7 @@ where
|
||||
Ok(val) => val,
|
||||
Err(e) => {
|
||||
let block = self.convert_to_block(input)?;
|
||||
return Err(InsertBlockError::new(block, e.into()).into())
|
||||
return Err(InsertBlockError::new(block, e.into()).into());
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -371,7 +381,7 @@ where
|
||||
self.convert_to_block(input)?,
|
||||
ProviderError::HeaderNotFound(parent_hash.into()).into(),
|
||||
)
|
||||
.into())
|
||||
.into());
|
||||
};
|
||||
let mut state_provider = ensure_ok!(provider_builder.build());
|
||||
drop(_enter);
|
||||
@@ -384,7 +394,7 @@ where
|
||||
self.convert_to_block(input)?,
|
||||
ProviderError::HeaderNotFound(parent_hash.into()).into(),
|
||||
)
|
||||
.into())
|
||||
.into());
|
||||
};
|
||||
|
||||
let evm_env = debug_span!(target: "engine::tree::payload_validator", "evm env")
|
||||
@@ -401,7 +411,7 @@ where
|
||||
};
|
||||
|
||||
// Plan the strategy used for state root computation.
|
||||
let strategy = self.plan_state_root_computation();
|
||||
let strategy = self.plan_state_root_computation(input.transaction_count());
|
||||
|
||||
debug!(
|
||||
target: "engine::tree::payload_validator",
|
||||
@@ -455,7 +465,7 @@ where
|
||||
// Execute the block and handle any execution errors.
|
||||
// The receipt root task is spawned before execution and receives receipts incrementally
|
||||
// as transactions complete, allowing parallel computation during execution.
|
||||
let (output, senders, receipt_root_rx) =
|
||||
let (output, senders, receipt_root_result) =
|
||||
match self.execute_block(state_provider, env, &input, &mut handle) {
|
||||
Ok(output) => output,
|
||||
Err(err) => return self.handle_execution_error(input, err, &parent_block),
|
||||
@@ -476,15 +486,18 @@ where
|
||||
let block = self.convert_to_block(input)?.with_senders(senders);
|
||||
|
||||
// Wait for the receipt root computation to complete.
|
||||
let receipt_root_bloom = receipt_root_rx
|
||||
.blocking_recv()
|
||||
.inspect_err(|_| {
|
||||
tracing::error!(
|
||||
target: "engine::tree::payload_validator",
|
||||
"Receipt root task dropped sender without result, receipt root calculation likely aborted"
|
||||
);
|
||||
})
|
||||
.ok();
|
||||
let receipt_root_bloom = match receipt_root_result {
|
||||
ReceiptRootResult::Precomputed(receipt_root_bloom) => Some(receipt_root_bloom),
|
||||
ReceiptRootResult::Pending(receipt_root_rx) => receipt_root_rx
|
||||
.blocking_recv()
|
||||
.inspect_err(|_| {
|
||||
tracing::error!(
|
||||
target: "engine::tree::payload_validator",
|
||||
"Receipt root task dropped sender without result, receipt root calculation likely aborted"
|
||||
);
|
||||
})
|
||||
.ok(),
|
||||
};
|
||||
|
||||
let hashed_state = ensure_ok_post_block!(
|
||||
self.validate_post_execution(
|
||||
@@ -617,7 +630,7 @@ where
|
||||
)
|
||||
.into(),
|
||||
)
|
||||
.into())
|
||||
.into());
|
||||
}
|
||||
|
||||
if let Some(valid_block_tx) = valid_block_tx {
|
||||
@@ -656,12 +669,12 @@ where
|
||||
fn validate_block_inner(&self, block: &SealedBlock<N::Block>) -> Result<(), ConsensusError> {
|
||||
if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
|
||||
error!(target: "engine::tree::payload_validator", ?block, "Failed to validate header {}: {e}", block.hash());
|
||||
return Err(e)
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
if let Err(e) = self.consensus.validate_block_pre_execution(block) {
|
||||
error!(target: "engine::tree::payload_validator", ?block, "Failed to validate block {}: {e}", block.hash());
|
||||
return Err(e)
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -671,7 +684,7 @@ where
|
||||
///
|
||||
/// This method orchestrates block execution:
|
||||
/// 1. Sets up the EVM with state database and precompile caching
|
||||
/// 2. Spawns a background task for incremental receipt root computation
|
||||
/// 2. Spawns a background task for incremental receipt root computation (if needed)
|
||||
/// 3. Executes transactions with metrics collection via state hooks
|
||||
/// 4. Merges state transitions and records execution metrics
|
||||
#[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
|
||||
@@ -683,11 +696,7 @@ where
|
||||
input: &BlockOrPayload<T>,
|
||||
handle: &mut PayloadHandle<impl ExecutableTxFor<Evm>, Err, N::Receipt>,
|
||||
) -> Result<
|
||||
(
|
||||
BlockExecutionOutput<N::Receipt>,
|
||||
Vec<Address>,
|
||||
tokio::sync::oneshot::Receiver<(B256, alloy_primitives::Bloom)>,
|
||||
),
|
||||
(BlockExecutionOutput<N::Receipt>, Vec<Address>, ReceiptRootResult),
|
||||
InsertBlockErrorKind,
|
||||
>
|
||||
where
|
||||
@@ -731,10 +740,16 @@ where
|
||||
// Spawn background task to compute receipt root and logs bloom incrementally.
|
||||
// Unbounded channel is used since tx count bounds capacity anyway (max ~30k txs per block).
|
||||
let receipts_len = input.transaction_count();
|
||||
let (receipt_tx, receipt_rx) = crossbeam_channel::unbounded();
|
||||
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
|
||||
let task_handle = ReceiptRootTaskHandle::new(receipt_rx, result_tx);
|
||||
self.payload_processor.executor().spawn_blocking(move || task_handle.run(receipts_len));
|
||||
let compute_receipt_root_inline = receipts_len <= SMALL_BLOCK_RECEIPT_ROOT_TX_THRESHOLD;
|
||||
let (receipt_tx, result_rx) = if compute_receipt_root_inline {
|
||||
(None, None)
|
||||
} else {
|
||||
let (receipt_tx, receipt_rx) = crossbeam_channel::unbounded();
|
||||
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
|
||||
let task_handle = ReceiptRootTaskHandle::new(receipt_rx, result_tx);
|
||||
self.payload_processor.executor().spawn_blocking(move || task_handle.run(receipts_len));
|
||||
(Some(receipt_tx), Some(result_rx))
|
||||
};
|
||||
|
||||
let transaction_count = input.transaction_count();
|
||||
let executor = executor.with_state_hook(Some(Box::new(handle.state_hook())));
|
||||
@@ -746,7 +761,7 @@ where
|
||||
executor,
|
||||
transaction_count,
|
||||
handle.iter_transactions(),
|
||||
&receipt_tx,
|
||||
receipt_tx.as_ref(),
|
||||
)?;
|
||||
drop(receipt_tx);
|
||||
|
||||
@@ -763,11 +778,21 @@ where
|
||||
|
||||
let output = BlockExecutionOutput { result, state: db.take_bundle() };
|
||||
|
||||
let receipt_root_result = if compute_receipt_root_inline {
|
||||
ReceiptRootResult::Precomputed(Self::compute_receipt_root_bloom(
|
||||
&output.result.receipts,
|
||||
))
|
||||
} else {
|
||||
ReceiptRootResult::Pending(
|
||||
result_rx.expect("receipt root receiver missing when task spawned"),
|
||||
)
|
||||
};
|
||||
|
||||
let execution_duration = execution_start.elapsed();
|
||||
self.metrics.record_block_execution(&output, execution_duration);
|
||||
|
||||
debug!(target: "engine::tree::payload_validator", elapsed = ?execution_duration, "Executed block");
|
||||
Ok((output, senders, result_rx))
|
||||
Ok((output, senders, receipt_root_result))
|
||||
}
|
||||
|
||||
/// Executes transactions and collects senders, streaming receipts to a background task.
|
||||
@@ -784,7 +809,7 @@ where
|
||||
mut executor: E,
|
||||
transaction_count: usize,
|
||||
transactions: impl Iterator<Item = Result<Tx, Err>>,
|
||||
receipt_tx: &crossbeam_channel::Sender<IndexedReceipt<N::Receipt>>,
|
||||
receipt_tx: Option<&crossbeam_channel::Sender<IndexedReceipt<N::Receipt>>>,
|
||||
) -> Result<(E, Vec<Address>), BlockExecutionError>
|
||||
where
|
||||
E: BlockExecutor<Receipt = N::Receipt>,
|
||||
@@ -831,13 +856,15 @@ where
|
||||
executor.execute_transaction(tx)?;
|
||||
self.metrics.record_transaction_execution(tx_start.elapsed());
|
||||
|
||||
let current_len = executor.receipts().len();
|
||||
if current_len > last_sent_len {
|
||||
last_sent_len = current_len;
|
||||
// Send the latest receipt to the background task for incremental root computation.
|
||||
if let Some(receipt) = executor.receipts().last() {
|
||||
let tx_index = current_len - 1;
|
||||
let _ = receipt_tx.send(IndexedReceipt::new(tx_index, receipt.clone()));
|
||||
if let Some(receipt_tx) = receipt_tx {
|
||||
let current_len = executor.receipts().len();
|
||||
if current_len > last_sent_len {
|
||||
last_sent_len = current_len;
|
||||
// Send the latest receipt to the background task for incremental root computation.
|
||||
if let Some(receipt) = executor.receipts().last() {
|
||||
let tx_index = current_len - 1;
|
||||
let _ = receipt_tx.send(IndexedReceipt::new(tx_index, receipt.clone()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1073,7 +1100,7 @@ where
|
||||
trace!(target: "engine::tree::payload_validator", block=?block.num_hash(), "Validating block consensus");
|
||||
// validate block consensus rules
|
||||
if let Err(e) = self.validate_block_inner(block) {
|
||||
return Err(e.into())
|
||||
return Err(e.into());
|
||||
}
|
||||
|
||||
// now validate against the parent
|
||||
@@ -1082,7 +1109,7 @@ where
|
||||
self.consensus.validate_header_against_parent(block.sealed_header(), parent_block)
|
||||
{
|
||||
warn!(target: "engine::tree::payload_validator", ?block, "Failed to validate header {} against parent: {e}", block.hash());
|
||||
return Err(e.into())
|
||||
return Err(e.into());
|
||||
}
|
||||
drop(_enter);
|
||||
|
||||
@@ -1095,7 +1122,7 @@ where
|
||||
{
|
||||
// call post-block hook
|
||||
self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
|
||||
return Err(err.into())
|
||||
return Err(err.into());
|
||||
}
|
||||
drop(_enter);
|
||||
|
||||
@@ -1110,7 +1137,7 @@ where
|
||||
{
|
||||
// call post-block hook
|
||||
self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
|
||||
return Err(err.into())
|
||||
return Err(err.into());
|
||||
}
|
||||
|
||||
// record post-execution validation duration
|
||||
@@ -1122,6 +1149,16 @@ where
|
||||
Ok(hashed_state)
|
||||
}
|
||||
|
||||
fn compute_receipt_root_bloom(receipts: &[N::Receipt]) -> ReceiptRootBloom {
|
||||
let receipts_with_bloom =
|
||||
receipts.iter().map(TxReceipt::with_bloom_ref).collect::<Vec<_>>();
|
||||
let receipts_root = calculate_receipt_root(&receipts_with_bloom);
|
||||
let logs_bloom = receipts_with_bloom
|
||||
.iter()
|
||||
.fold(Bloom::ZERO, |bloom, receipt| bloom | receipt.bloom_ref());
|
||||
(receipts_root, logs_bloom)
|
||||
}
|
||||
|
||||
/// Spawns a payload processor task based on the state root strategy.
|
||||
///
|
||||
/// This method determines how to execute the block and compute its state root based on
|
||||
@@ -1218,7 +1255,7 @@ where
|
||||
self.provider.clone(),
|
||||
historical,
|
||||
Some(blocks),
|
||||
)))
|
||||
)));
|
||||
}
|
||||
|
||||
// Check if the block is persisted
|
||||
@@ -1226,7 +1263,7 @@ where
|
||||
debug!(target: "engine::tree::payload_validator", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
|
||||
// For persisted blocks, we create a builder that will fetch state directly from the
|
||||
// database
|
||||
return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
|
||||
return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)));
|
||||
}
|
||||
|
||||
debug!(target: "engine::tree::payload_validator", %hash, "no canonical state found for block");
|
||||
@@ -1237,7 +1274,12 @@ where
|
||||
///
|
||||
/// Note: Use state root task only if prefix sets are empty, otherwise proof generation is
|
||||
/// too expensive because it requires walking all paths in every proof.
|
||||
const fn plan_state_root_computation(&self) -> StateRootStrategy {
|
||||
fn plan_state_root_computation(&self, transaction_count: usize) -> StateRootStrategy {
|
||||
// Small blocks are faster without spawning parallel state root tasks.
|
||||
if transaction_count > 0 && transaction_count <= SMALL_BLOCK_STATE_ROOT_TX_THRESHOLD {
|
||||
return StateRootStrategy::Synchronous;
|
||||
}
|
||||
|
||||
if self.config.state_root_fallback() {
|
||||
StateRootStrategy::Synchronous
|
||||
} else if self.config.use_state_root_task() {
|
||||
@@ -1258,7 +1300,7 @@ where
|
||||
) {
|
||||
if state.invalid_headers.get(&block.hash()).is_some() {
|
||||
// we already marked this block as invalid
|
||||
return
|
||||
return;
|
||||
}
|
||||
self.invalid_block_hook.on_invalid_block(parent_header, block, output, trie_updates);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user