Compare commits

...

3 Commits
t4 ... yk/deep

Author SHA1 Message Date
yongkangc
aab08b82ea perf: use synchronous state root for small blocks
Amp-Thread-ID: https://ampcode.com/threads/T-019c649c-0e7d-74ce-9e41-be800680229a
2026-02-16 04:06:31 +00:00
yongkangc
6d357545ef perf: inline receipt root for small payloads
Amp-Thread-ID: https://ampcode.com/threads/T-019c6479-e793-7149-9d33-d7cb75d0adc1
2026-02-16 03:44:51 +00:00
yongkangc
2740793e42 perf(engine): reduce save_cache write lock hold time
Move insert_state, update_metrics, and valid_block_rx.recv() outside
the PayloadExecutionCache write lock. A short take() under the lock
first detaches the published cache so readers see None during the
update window, preventing observation of partially-mutated Arc-shared
ExecutionCache state.

Amp-Thread-ID: https://ampcode.com/threads/T-019c42d4-fff9-727c-8256-eb3684deaba4
2026-02-16 02:48:53 +00:00
2 changed files with 113 additions and 74 deletions

View File

@@ -236,36 +236,33 @@ where
if let Some(saved_cache) = saved_cache {
debug!(target: "engine::caching", parent_hash=?hash, "Updating execution cache");
// Perform all cache operations atomically under the lock
// Detach the published cache so readers see None during the update.
// This is necessary because ExecutionCache is Arc-shared: mutating
// it via insert_state would be visible through the old SavedCache.
execution_cache.update_with_guard(|cached| {
// consumes the `SavedCache` held by the prewarming task, which releases its usage
// guard
let (caches, cache_metrics, disable_cache_metrics) = saved_cache.split();
let new_cache = SavedCache::new(hash, caches, cache_metrics)
.with_disable_cache_metrics(disable_cache_metrics);
cached.take();
});
// Insert state into cache while holding the lock
// Access the BundleState through the shared ExecutionOutcome
if new_cache.cache().insert_state(&execution_outcome.state).is_err() {
// Clear the cache on error to prevent having a polluted cache
*cached = None;
debug!(target: "engine::caching", "cleared execution cache on update error");
return;
}
let (caches, cache_metrics, disable_cache_metrics) = saved_cache.split();
let new_cache = SavedCache::new(hash, caches, cache_metrics)
.with_disable_cache_metrics(disable_cache_metrics);
if new_cache.cache().insert_state(&execution_outcome.state).is_err() {
debug!(target: "engine::caching", "cleared execution cache on update error");
} else {
new_cache.update_metrics();
if valid_block_rx.recv().is_ok() {
// Replace the shared cache with the new one; the previous cache (if any) is
// dropped.
*cached = Some(new_cache);
} else {
// Block was invalid; caches were already mutated by insert_state above,
// so we must clear to prevent using polluted state
*cached = None;
debug!(target: "engine::caching", "cleared execution cache on invalid block");
}
});
let valid = valid_block_rx.recv().is_ok();
execution_cache.update_with_guard(|cached| {
if valid {
*cached = Some(new_cache);
} else {
debug!(target: "engine::caching", "cleared execution cache on invalid block");
}
});
}
let elapsed = start.elapsed();
debug!(target: "engine::caching", parent_hash=?hash, elapsed=?elapsed, "Updated execution cache");

View File

@@ -10,11 +10,11 @@ use crate::tree::{
EngineApiMetrics, EngineApiTreeState, ExecutionEnv, PayloadHandle, StateProviderBuilder,
StateProviderDatabase, TreeConfig,
};
use alloy_consensus::transaction::{Either, TxHashRef};
use alloy_consensus::{proofs::calculate_receipt_root, transaction::Either, TxHashRef, TxReceipt};
use alloy_eip7928::BlockAccessList;
use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal, NumHash};
use alloy_evm::Evm;
use alloy_primitives::B256;
use alloy_primitives::{Bloom, B256};
use crate::tree::payload_processor::receipt_root_task::{IndexedReceipt, ReceiptRootTaskHandle};
use reth_chain_state::{CanonicalInMemoryState, DeferredTrieData, ExecutedBlock, LazyOverlay};
@@ -53,6 +53,16 @@ use std::{
};
use tracing::{debug, debug_span, error, info, instrument, trace, warn};
/// Blocks with at most this many transactions compute the receipt root inline to avoid
/// background task overhead.
const SMALL_BLOCK_RECEIPT_ROOT_TX_THRESHOLD: usize = 50;
const SMALL_BLOCK_STATE_ROOT_TX_THRESHOLD: usize = 50;
enum ReceiptRootResult {
Precomputed(ReceiptRootBloom),
Pending(tokio::sync::oneshot::Receiver<ReceiptRootBloom>),
}
/// Context providing access to tree state during validation.
///
/// This context is provided to the [`EngineValidator`] and includes the state of the tree's
@@ -290,7 +300,7 @@ where
// Validate block consensus rules which includes header validation
if let Err(consensus_err) = self.validate_block_inner(&block) {
// Header validation error takes precedence over execution error
return Err(InsertBlockError::new(block, consensus_err.into()).into())
return Err(InsertBlockError::new(block, consensus_err.into()).into());
}
// Also validate against the parent
@@ -298,7 +308,7 @@ where
self.consensus.validate_header_against_parent(block.sealed_header(), parent_block)
{
// Parent validation error takes precedence over execution error
return Err(InsertBlockError::new(block, consensus_err.into()).into())
return Err(InsertBlockError::new(block, consensus_err.into()).into());
}
// No header validation errors, return the original execution error
@@ -338,7 +348,7 @@ where
Ok(val) => val,
Err(e) => {
let block = self.convert_to_block(input)?;
return Err(InsertBlockError::new(block, e.into()).into())
return Err(InsertBlockError::new(block, e.into()).into());
}
}
};
@@ -371,7 +381,7 @@ where
self.convert_to_block(input)?,
ProviderError::HeaderNotFound(parent_hash.into()).into(),
)
.into())
.into());
};
let mut state_provider = ensure_ok!(provider_builder.build());
drop(_enter);
@@ -384,7 +394,7 @@ where
self.convert_to_block(input)?,
ProviderError::HeaderNotFound(parent_hash.into()).into(),
)
.into())
.into());
};
let evm_env = debug_span!(target: "engine::tree::payload_validator", "evm env")
@@ -401,7 +411,7 @@ where
};
// Plan the strategy used for state root computation.
let strategy = self.plan_state_root_computation();
let strategy = self.plan_state_root_computation(input.transaction_count());
debug!(
target: "engine::tree::payload_validator",
@@ -455,7 +465,7 @@ where
// Execute the block and handle any execution errors.
// The receipt root task is spawned before execution and receives receipts incrementally
// as transactions complete, allowing parallel computation during execution.
let (output, senders, receipt_root_rx) =
let (output, senders, receipt_root_result) =
match self.execute_block(state_provider, env, &input, &mut handle) {
Ok(output) => output,
Err(err) => return self.handle_execution_error(input, err, &parent_block),
@@ -476,15 +486,18 @@ where
let block = self.convert_to_block(input)?.with_senders(senders);
// Wait for the receipt root computation to complete.
let receipt_root_bloom = receipt_root_rx
.blocking_recv()
.inspect_err(|_| {
tracing::error!(
target: "engine::tree::payload_validator",
"Receipt root task dropped sender without result, receipt root calculation likely aborted"
);
})
.ok();
let receipt_root_bloom = match receipt_root_result {
ReceiptRootResult::Precomputed(receipt_root_bloom) => Some(receipt_root_bloom),
ReceiptRootResult::Pending(receipt_root_rx) => receipt_root_rx
.blocking_recv()
.inspect_err(|_| {
tracing::error!(
target: "engine::tree::payload_validator",
"Receipt root task dropped sender without result, receipt root calculation likely aborted"
);
})
.ok(),
};
let hashed_state = ensure_ok_post_block!(
self.validate_post_execution(
@@ -617,7 +630,7 @@ where
)
.into(),
)
.into())
.into());
}
if let Some(valid_block_tx) = valid_block_tx {
@@ -656,12 +669,12 @@ where
fn validate_block_inner(&self, block: &SealedBlock<N::Block>) -> Result<(), ConsensusError> {
if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
error!(target: "engine::tree::payload_validator", ?block, "Failed to validate header {}: {e}", block.hash());
return Err(e)
return Err(e);
}
if let Err(e) = self.consensus.validate_block_pre_execution(block) {
error!(target: "engine::tree::payload_validator", ?block, "Failed to validate block {}: {e}", block.hash());
return Err(e)
return Err(e);
}
Ok(())
@@ -671,7 +684,7 @@ where
///
/// This method orchestrates block execution:
/// 1. Sets up the EVM with state database and precompile caching
/// 2. Spawns a background task for incremental receipt root computation
/// 2. Spawns a background task for incremental receipt root computation (if needed)
/// 3. Executes transactions with metrics collection via state hooks
/// 4. Merges state transitions and records execution metrics
#[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
@@ -683,11 +696,7 @@ where
input: &BlockOrPayload<T>,
handle: &mut PayloadHandle<impl ExecutableTxFor<Evm>, Err, N::Receipt>,
) -> Result<
(
BlockExecutionOutput<N::Receipt>,
Vec<Address>,
tokio::sync::oneshot::Receiver<(B256, alloy_primitives::Bloom)>,
),
(BlockExecutionOutput<N::Receipt>, Vec<Address>, ReceiptRootResult),
InsertBlockErrorKind,
>
where
@@ -731,10 +740,16 @@ where
// Spawn background task to compute receipt root and logs bloom incrementally.
// Unbounded channel is used since tx count bounds capacity anyway (max ~30k txs per block).
let receipts_len = input.transaction_count();
let (receipt_tx, receipt_rx) = crossbeam_channel::unbounded();
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
let task_handle = ReceiptRootTaskHandle::new(receipt_rx, result_tx);
self.payload_processor.executor().spawn_blocking(move || task_handle.run(receipts_len));
let compute_receipt_root_inline = receipts_len <= SMALL_BLOCK_RECEIPT_ROOT_TX_THRESHOLD;
let (receipt_tx, result_rx) = if compute_receipt_root_inline {
(None, None)
} else {
let (receipt_tx, receipt_rx) = crossbeam_channel::unbounded();
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
let task_handle = ReceiptRootTaskHandle::new(receipt_rx, result_tx);
self.payload_processor.executor().spawn_blocking(move || task_handle.run(receipts_len));
(Some(receipt_tx), Some(result_rx))
};
let transaction_count = input.transaction_count();
let executor = executor.with_state_hook(Some(Box::new(handle.state_hook())));
@@ -746,7 +761,7 @@ where
executor,
transaction_count,
handle.iter_transactions(),
&receipt_tx,
receipt_tx.as_ref(),
)?;
drop(receipt_tx);
@@ -763,11 +778,21 @@ where
let output = BlockExecutionOutput { result, state: db.take_bundle() };
let receipt_root_result = if compute_receipt_root_inline {
ReceiptRootResult::Precomputed(Self::compute_receipt_root_bloom(
&output.result.receipts,
))
} else {
ReceiptRootResult::Pending(
result_rx.expect("receipt root receiver missing when task spawned"),
)
};
let execution_duration = execution_start.elapsed();
self.metrics.record_block_execution(&output, execution_duration);
debug!(target: "engine::tree::payload_validator", elapsed = ?execution_duration, "Executed block");
Ok((output, senders, result_rx))
Ok((output, senders, receipt_root_result))
}
/// Executes transactions and collects senders, streaming receipts to a background task.
@@ -784,7 +809,7 @@ where
mut executor: E,
transaction_count: usize,
transactions: impl Iterator<Item = Result<Tx, Err>>,
receipt_tx: &crossbeam_channel::Sender<IndexedReceipt<N::Receipt>>,
receipt_tx: Option<&crossbeam_channel::Sender<IndexedReceipt<N::Receipt>>>,
) -> Result<(E, Vec<Address>), BlockExecutionError>
where
E: BlockExecutor<Receipt = N::Receipt>,
@@ -831,13 +856,15 @@ where
executor.execute_transaction(tx)?;
self.metrics.record_transaction_execution(tx_start.elapsed());
let current_len = executor.receipts().len();
if current_len > last_sent_len {
last_sent_len = current_len;
// Send the latest receipt to the background task for incremental root computation.
if let Some(receipt) = executor.receipts().last() {
let tx_index = current_len - 1;
let _ = receipt_tx.send(IndexedReceipt::new(tx_index, receipt.clone()));
if let Some(receipt_tx) = receipt_tx {
let current_len = executor.receipts().len();
if current_len > last_sent_len {
last_sent_len = current_len;
// Send the latest receipt to the background task for incremental root computation.
if let Some(receipt) = executor.receipts().last() {
let tx_index = current_len - 1;
let _ = receipt_tx.send(IndexedReceipt::new(tx_index, receipt.clone()));
}
}
}
}
@@ -1073,7 +1100,7 @@ where
trace!(target: "engine::tree::payload_validator", block=?block.num_hash(), "Validating block consensus");
// validate block consensus rules
if let Err(e) = self.validate_block_inner(block) {
return Err(e.into())
return Err(e.into());
}
// now validate against the parent
@@ -1082,7 +1109,7 @@ where
self.consensus.validate_header_against_parent(block.sealed_header(), parent_block)
{
warn!(target: "engine::tree::payload_validator", ?block, "Failed to validate header {} against parent: {e}", block.hash());
return Err(e.into())
return Err(e.into());
}
drop(_enter);
@@ -1095,7 +1122,7 @@ where
{
// call post-block hook
self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
return Err(err.into())
return Err(err.into());
}
drop(_enter);
@@ -1110,7 +1137,7 @@ where
{
// call post-block hook
self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
return Err(err.into())
return Err(err.into());
}
// record post-execution validation duration
@@ -1122,6 +1149,16 @@ where
Ok(hashed_state)
}
fn compute_receipt_root_bloom(receipts: &[N::Receipt]) -> ReceiptRootBloom {
let receipts_with_bloom =
receipts.iter().map(TxReceipt::with_bloom_ref).collect::<Vec<_>>();
let receipts_root = calculate_receipt_root(&receipts_with_bloom);
let logs_bloom = receipts_with_bloom
.iter()
.fold(Bloom::ZERO, |bloom, receipt| bloom | receipt.bloom_ref());
(receipts_root, logs_bloom)
}
/// Spawns a payload processor task based on the state root strategy.
///
/// This method determines how to execute the block and compute its state root based on
@@ -1218,7 +1255,7 @@ where
self.provider.clone(),
historical,
Some(blocks),
)))
)));
}
// Check if the block is persisted
@@ -1226,7 +1263,7 @@ where
debug!(target: "engine::tree::payload_validator", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
// For persisted blocks, we create a builder that will fetch state directly from the
// database
return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)));
}
debug!(target: "engine::tree::payload_validator", %hash, "no canonical state found for block");
@@ -1237,7 +1274,12 @@ where
///
/// Note: Use state root task only if prefix sets are empty, otherwise proof generation is
/// too expensive because it requires walking all paths in every proof.
const fn plan_state_root_computation(&self) -> StateRootStrategy {
fn plan_state_root_computation(&self, transaction_count: usize) -> StateRootStrategy {
// Small blocks are faster without spawning parallel state root tasks.
if transaction_count > 0 && transaction_count <= SMALL_BLOCK_STATE_ROOT_TX_THRESHOLD {
return StateRootStrategy::Synchronous;
}
if self.config.state_root_fallback() {
StateRootStrategy::Synchronous
} else if self.config.use_state_root_task() {
@@ -1258,7 +1300,7 @@ where
) {
if state.invalid_headers.get(&block.hash()).is_some() {
// we already marked this block as invalid
return
return;
}
self.invalid_block_hook.on_invalid_block(parent_header, block, output, trie_updates);
}