mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-09 15:28:01 -05:00
This commit is contained in:
@@ -71,7 +71,7 @@ where
|
||||
/// Internal function used to advance the chain.
|
||||
///
|
||||
/// Polls the `ChainOrchestrator` for the next event.
|
||||
#[tracing::instrument(name = "ChainOrchestrator::poll", skip(self, cx))]
|
||||
#[tracing::instrument(level = "debug", name = "ChainOrchestrator::poll", skip(self, cx))]
|
||||
fn poll_next_event(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<ChainEvent<T::Event>> {
|
||||
let this = self.get_mut();
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ use reth_trie::{
|
||||
MultiProofTargets, StorageMultiProof, StorageProof, TrieInput,
|
||||
};
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use tracing::{debug_span, instrument, trace};
|
||||
use tracing::trace;
|
||||
|
||||
pub(crate) type Cache<K, V> =
|
||||
mini_moka::sync::Cache<K, V, alloy_primitives::map::DefaultHashBuilder>;
|
||||
@@ -354,7 +354,6 @@ impl ExecutionCache {
|
||||
}
|
||||
|
||||
/// Invalidates the storage for all addresses in the set
|
||||
#[instrument(level = "debug", target = "engine::tree", skip_all, fields(accounts = addresses.len()))]
|
||||
pub(crate) fn invalidate_storages(&self, addresses: HashSet<&Address>) {
|
||||
// NOTE: this must collect because the invalidate function should not be called while we
|
||||
// hold an iter for it
|
||||
@@ -386,25 +385,12 @@ impl ExecutionCache {
|
||||
/// ## Error Handling
|
||||
///
|
||||
/// Returns an error if the state updates are inconsistent and should be discarded.
|
||||
#[instrument(level = "debug", target = "engine::tree", skip_all)]
|
||||
pub(crate) fn insert_state(&self, state_updates: &BundleState) -> Result<(), ()> {
|
||||
let _enter =
|
||||
debug_span!(target: "engine::tree", "contracts", len = state_updates.contracts.len())
|
||||
.entered();
|
||||
// Insert bytecodes
|
||||
for (code_hash, bytecode) in &state_updates.contracts {
|
||||
self.code_cache.insert(*code_hash, Some(Bytecode(bytecode.clone())));
|
||||
}
|
||||
drop(_enter);
|
||||
|
||||
let _enter = debug_span!(
|
||||
target: "engine::tree",
|
||||
"accounts",
|
||||
accounts = state_updates.state.len(),
|
||||
storages =
|
||||
state_updates.state.values().map(|account| account.storage.len()).sum::<usize>()
|
||||
)
|
||||
.entered();
|
||||
let mut invalidated_accounts = HashSet::default();
|
||||
for (addr, account) in &state_updates.state {
|
||||
// If the account was not modified, as in not changed and not destroyed, then we have
|
||||
|
||||
@@ -79,7 +79,7 @@ impl EngineApiMetrics {
|
||||
for tx in transactions {
|
||||
let tx = tx?;
|
||||
let span =
|
||||
debug_span!(target: "engine::tree", "execute tx", tx_hash=?tx.tx().tx_hash());
|
||||
debug_span!(target: "engine::tree", "execute_tx", tx_hash=?tx.tx().tx_hash());
|
||||
let _enter = span.enter();
|
||||
trace!(target: "engine::tree", "Executing transaction");
|
||||
executor.execute_transaction(tx)?;
|
||||
|
||||
@@ -496,12 +496,7 @@ where
|
||||
///
|
||||
/// This returns a [`PayloadStatus`] that represents the outcome of a processed new payload and
|
||||
/// returns an error if an internal error occurred.
|
||||
#[instrument(
|
||||
level = "debug",
|
||||
target = "engine::tree",
|
||||
skip_all,
|
||||
fields(block_hash = %payload.block_hash(), block_num = %payload.block_number()),
|
||||
)]
|
||||
#[instrument(level = "trace", skip_all, fields(block_hash = %payload.block_hash(), block_num = %payload.block_number(),), target = "engine::tree")]
|
||||
fn on_new_payload(
|
||||
&mut self,
|
||||
payload: T::ExecutionData,
|
||||
@@ -582,7 +577,6 @@ where
|
||||
/// - `Valid`: Payload successfully validated and inserted
|
||||
/// - `Syncing`: Parent missing, payload buffered for later
|
||||
/// - Error status: Payload is invalid
|
||||
#[instrument(level = "debug", target = "engine::tree", skip_all)]
|
||||
fn try_insert_payload(
|
||||
&mut self,
|
||||
payload: T::ExecutionData,
|
||||
@@ -976,7 +970,7 @@ where
|
||||
/// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1).
|
||||
///
|
||||
/// Returns an error if an internal error occurred like a database error.
|
||||
#[instrument(level = "debug", target = "engine::tree", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash))]
|
||||
#[instrument(level = "trace", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash), target = "engine::tree")]
|
||||
fn on_forkchoice_updated(
|
||||
&mut self,
|
||||
state: ForkchoiceState,
|
||||
@@ -1978,7 +1972,7 @@ where
|
||||
}
|
||||
|
||||
/// Attempts to connect any buffered blocks that are connected to the given parent hash.
|
||||
#[instrument(level = "debug", target = "engine::tree", skip(self))]
|
||||
#[instrument(level = "trace", skip(self), target = "engine::tree")]
|
||||
fn try_connect_buffered_blocks(
|
||||
&mut self,
|
||||
parent: BlockNumHash,
|
||||
@@ -2287,7 +2281,7 @@ where
|
||||
/// Returns an event with the appropriate action to take, such as:
|
||||
/// - download more missing blocks
|
||||
/// - try to canonicalize the target if the `block` is the tracked target (head) block.
|
||||
#[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_hash = %block.hash(), block_num = %block.number()))]
|
||||
#[instrument(level = "trace", skip_all, fields(block_hash = %block.hash(), block_num = %block.number(),), target = "engine::tree")]
|
||||
fn on_downloaded_block(
|
||||
&mut self,
|
||||
block: RecoveredBlock<N::Block>,
|
||||
@@ -2393,7 +2387,6 @@ where
|
||||
/// Returns `InsertPayloadOk::Inserted(BlockStatus::Valid)` on successful execution,
|
||||
/// `InsertPayloadOk::AlreadySeen` if the block already exists, or
|
||||
/// `InsertPayloadOk::Inserted(BlockStatus::Disconnected)` if parent state is missing.
|
||||
#[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_id))]
|
||||
fn insert_block_or_payload<Input, Err>(
|
||||
&mut self,
|
||||
block_id: BlockWithParent,
|
||||
|
||||
@@ -45,7 +45,7 @@ use std::sync::{
|
||||
mpsc::{self, channel, Sender},
|
||||
Arc,
|
||||
};
|
||||
use tracing::{debug, debug_span, instrument, warn};
|
||||
use tracing::{debug, instrument, warn};
|
||||
|
||||
mod configured_sparse_trie;
|
||||
pub mod executor;
|
||||
@@ -167,12 +167,6 @@ where
|
||||
/// This returns a handle to await the final state root and to interact with the tasks (e.g.
|
||||
/// canceling)
|
||||
#[allow(clippy::type_complexity)]
|
||||
#[instrument(
|
||||
level = "debug",
|
||||
target = "engine::tree::payload_processor",
|
||||
name = "payload processor",
|
||||
skip_all
|
||||
)]
|
||||
pub fn spawn<P, I: ExecutableTxIterator<Evm>>(
|
||||
&mut self,
|
||||
env: ExecutionEnv<Evm>,
|
||||
@@ -242,9 +236,7 @@ where
|
||||
);
|
||||
|
||||
// spawn multi-proof task
|
||||
let span = tracing::Span::current();
|
||||
self.executor.spawn_blocking(move || {
|
||||
let _enter = span.entered();
|
||||
multi_proof_task.run();
|
||||
});
|
||||
|
||||
@@ -265,7 +257,6 @@ where
|
||||
/// Spawns a task that exclusively handles cache prewarming for transaction execution.
|
||||
///
|
||||
/// Returns a [`PayloadHandle`] to communicate with the task.
|
||||
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
|
||||
pub(super) fn spawn_cache_exclusive<P, I: ExecutableTxIterator<Evm>>(
|
||||
&self,
|
||||
env: ExecutionEnv<Evm>,
|
||||
@@ -362,9 +353,7 @@ where
|
||||
// spawn pre-warm task
|
||||
{
|
||||
let to_prewarm_task = to_prewarm_task.clone();
|
||||
let span = debug_span!(target: "engine::tree::payload_processor", "prewarm task");
|
||||
self.executor.spawn_blocking(move || {
|
||||
let _enter = span.entered();
|
||||
prewarm_task.run(transactions, to_prewarm_task);
|
||||
});
|
||||
}
|
||||
@@ -381,7 +370,7 @@ where
|
||||
///
|
||||
/// If the given hash is different then what is recently cached, then this will create a new
|
||||
/// instance.
|
||||
#[instrument(level = "debug", target = "engine::caching", skip(self))]
|
||||
#[instrument(target = "engine::caching", skip(self))]
|
||||
fn cache_for(&self, parent_hash: B256) -> SavedCache {
|
||||
if let Some(cache) = self.execution_cache.get_cache_for(parent_hash) {
|
||||
debug!("reusing execution cache");
|
||||
@@ -394,7 +383,6 @@ where
|
||||
}
|
||||
|
||||
/// Spawns the [`SparseTrieTask`] for this payload processor.
|
||||
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
|
||||
fn spawn_sparse_trie_task<BPF>(
|
||||
&self,
|
||||
sparse_trie_rx: mpsc::Receiver<SparseTrieUpdate>,
|
||||
@@ -433,18 +421,13 @@ where
|
||||
sparse_state_trie,
|
||||
);
|
||||
|
||||
let span = tracing::Span::current();
|
||||
self.executor.spawn_blocking(move || {
|
||||
let _enter = span.entered();
|
||||
|
||||
let (result, trie) = task.run();
|
||||
// Send state root computation result
|
||||
let _ = state_root_tx.send(result);
|
||||
|
||||
// Clear the SparseStateTrie and replace it back into the mutex _after_ sending
|
||||
// results to the next step, so that time spent clearing doesn't block the step after
|
||||
// this one.
|
||||
let _enter = debug_span!(target: "engine::tree::payload_processor", "clear").entered();
|
||||
// Clear the SparseStateTrie and replace it back into the mutex _after_ sending results
|
||||
// to the next step, so that time spent clearing doesn't block the step after this one.
|
||||
cleared_sparse_trie.lock().replace(ClearedSparseStateTrie::from_state_trie(trie));
|
||||
});
|
||||
}
|
||||
@@ -469,7 +452,6 @@ impl<Tx, Err> PayloadHandle<Tx, Err> {
|
||||
/// # Panics
|
||||
///
|
||||
/// If payload processing was started without background tasks.
|
||||
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
|
||||
pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
|
||||
self.state_root
|
||||
.take()
|
||||
|
||||
@@ -32,7 +32,7 @@ use std::{
|
||||
},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tracing::{debug, error, instrument, trace};
|
||||
use tracing::{debug, error, trace};
|
||||
|
||||
/// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the
|
||||
/// state.
|
||||
@@ -718,7 +718,6 @@ impl MultiProofTask {
|
||||
/// Handles request for proof prefetch.
|
||||
///
|
||||
/// Returns a number of proofs that were spawned.
|
||||
#[instrument(level = "debug", target = "engine::tree::payload_processor::multiproof", skip_all, fields(accounts = targets.len()))]
|
||||
fn on_prefetch_proof(&mut self, targets: MultiProofTargets) -> u64 {
|
||||
let proof_targets = self.get_prefetch_proof_targets(targets);
|
||||
self.fetched_proof_targets.extend_ref(&proof_targets);
|
||||
@@ -845,7 +844,6 @@ impl MultiProofTask {
|
||||
/// Handles state updates.
|
||||
///
|
||||
/// Returns a number of proofs that were spawned.
|
||||
#[instrument(level = "debug", target = "engine::tree::payload_processor::multiproof", skip(self, update), fields(accounts = update.len()))]
|
||||
fn on_state_update(&mut self, source: StateChangeSource, update: EvmState) -> u64 {
|
||||
let hashed_state_update = evm_state_to_hashed_post_state(update);
|
||||
|
||||
@@ -975,7 +973,6 @@ impl MultiProofTask {
|
||||
/// currently being calculated, or if there are any pending proofs in the proof sequencer
|
||||
/// left to be revealed by checking the pending tasks.
|
||||
/// 6. This task exits after all pending proofs are processed.
|
||||
#[instrument(level = "debug", target = "engine::tree::payload_processor::multiproof", skip_all)]
|
||||
pub(crate) fn run(mut self) {
|
||||
// TODO convert those into fields
|
||||
let mut prefetch_proofs_requested = 0;
|
||||
|
||||
@@ -39,7 +39,7 @@ use std::{
|
||||
},
|
||||
time::Instant,
|
||||
};
|
||||
use tracing::{debug, debug_span, instrument, trace, warn};
|
||||
use tracing::{debug, trace, warn};
|
||||
|
||||
/// A wrapper for transactions that includes their index in the block.
|
||||
#[derive(Clone)]
|
||||
@@ -139,11 +139,8 @@ where
|
||||
let ctx = self.ctx.clone();
|
||||
let max_concurrency = self.max_concurrency;
|
||||
let transaction_count_hint = self.transaction_count_hint;
|
||||
let span = tracing::Span::current();
|
||||
|
||||
self.executor.spawn_blocking(move || {
|
||||
let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", parent: span, "spawn_all").entered();
|
||||
|
||||
let (done_tx, done_rx) = mpsc::channel();
|
||||
let mut executing = 0usize;
|
||||
|
||||
@@ -160,8 +157,8 @@ where
|
||||
};
|
||||
|
||||
// Only spawn initial workers as needed
|
||||
for i in 0..workers_needed {
|
||||
handles.push(ctx.spawn_worker(i, &executor, actions_tx.clone(), done_tx.clone()));
|
||||
for _ in 0..workers_needed {
|
||||
handles.push(ctx.spawn_worker(&executor, actions_tx.clone(), done_tx.clone()));
|
||||
}
|
||||
|
||||
let mut tx_index = 0usize;
|
||||
@@ -251,7 +248,6 @@ where
|
||||
/// the new, warmed cache to be inserted.
|
||||
///
|
||||
/// This method is called from `run()` only after all execution tasks are complete.
|
||||
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
|
||||
fn save_cache(self, state: BundleState) {
|
||||
let start = Instant::now();
|
||||
|
||||
@@ -288,12 +284,6 @@ where
|
||||
///
|
||||
/// This will execute the transactions until all transactions have been processed or the task
|
||||
/// was cancelled.
|
||||
#[instrument(
|
||||
level = "debug",
|
||||
target = "engine::tree::payload_processor::prewarm",
|
||||
name = "prewarm",
|
||||
skip_all
|
||||
)]
|
||||
pub(super) fn run(
|
||||
self,
|
||||
pending: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
|
||||
@@ -374,7 +364,6 @@ where
|
||||
{
|
||||
/// Splits this context into an evm, an evm config, metrics, and the atomic bool for terminating
|
||||
/// execution.
|
||||
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
|
||||
fn evm_for_ctx(self) -> Option<(EvmFor<Evm, impl Database>, PrewarmMetrics, Arc<AtomicBool>)> {
|
||||
let Self {
|
||||
env,
|
||||
@@ -391,7 +380,7 @@ where
|
||||
Ok(provider) => provider,
|
||||
Err(err) => {
|
||||
trace!(
|
||||
target: "engine::tree::payload_processor::prewarm",
|
||||
target: "engine::tree",
|
||||
%err,
|
||||
"Failed to build state provider in prewarm thread"
|
||||
);
|
||||
@@ -440,7 +429,6 @@ where
|
||||
///
|
||||
/// Note: There are no ordering guarantees; this does not reflect the state produced by
|
||||
/// sequential execution.
|
||||
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
|
||||
fn transact_batch<Tx>(
|
||||
self,
|
||||
txs: mpsc::Receiver<IndexedTransaction<Tx>>,
|
||||
@@ -451,15 +439,7 @@ where
|
||||
{
|
||||
let Some((mut evm, metrics, terminate_execution)) = self.evm_for_ctx() else { return };
|
||||
|
||||
while let Ok(IndexedTransaction { index, tx }) = {
|
||||
let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", "recv tx")
|
||||
.entered();
|
||||
txs.recv()
|
||||
} {
|
||||
let _enter =
|
||||
debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm tx", index, tx_hash=%tx.tx().tx_hash())
|
||||
.entered();
|
||||
|
||||
while let Ok(IndexedTransaction { index, tx }) = txs.recv() {
|
||||
// If the task was cancelled, stop execution, send an empty result to notify the task,
|
||||
// and exit.
|
||||
if terminate_execution.load(Ordering::Relaxed) {
|
||||
@@ -487,18 +467,12 @@ where
|
||||
};
|
||||
metrics.execution_duration.record(start.elapsed());
|
||||
|
||||
drop(_enter);
|
||||
|
||||
// Only send outcome for transactions after the first txn
|
||||
// as the main execution will be just as fast
|
||||
if index > 0 {
|
||||
let _enter =
|
||||
debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm outcome", index, tx_hash=%tx.tx().tx_hash())
|
||||
.entered();
|
||||
let (targets, storage_targets) = multiproof_targets_from_state(res.state);
|
||||
metrics.prefetch_storage_targets.record(storage_targets as f64);
|
||||
let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: Some(targets) });
|
||||
drop(_enter);
|
||||
}
|
||||
|
||||
metrics.total_runtime.record(start.elapsed());
|
||||
@@ -511,7 +485,6 @@ where
|
||||
/// Spawns a worker task for transaction execution and returns its sender channel.
|
||||
fn spawn_worker<Tx>(
|
||||
&self,
|
||||
idx: usize,
|
||||
executor: &WorkloadExecutor,
|
||||
actions_tx: Sender<PrewarmTaskEvent>,
|
||||
done_tx: Sender<()>,
|
||||
@@ -521,11 +494,8 @@ where
|
||||
{
|
||||
let (tx, rx) = mpsc::channel();
|
||||
let ctx = self.clone();
|
||||
let span =
|
||||
debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm worker", idx);
|
||||
|
||||
executor.spawn_blocking(move || {
|
||||
let _enter = span.entered();
|
||||
ctx.transact_batch(rx, actions_tx, done_tx);
|
||||
});
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@ use std::{
|
||||
sync::mpsc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tracing::{debug, debug_span, instrument, trace};
|
||||
use tracing::{debug, trace, trace_span};
|
||||
|
||||
/// A task responsible for populating the sparse trie.
|
||||
pub(super) struct SparseTrieTask<BPF, A = SerialSparseTrie, S = SerialSparseTrie>
|
||||
@@ -61,11 +61,6 @@ where
|
||||
///
|
||||
/// - State root computation outcome.
|
||||
/// - `SparseStateTrie` that needs to be cleared and reused to avoid reallocations.
|
||||
#[instrument(
|
||||
level = "debug",
|
||||
target = "engine::tree::payload_processor::sparse_trie",
|
||||
skip_all
|
||||
)]
|
||||
pub(super) fn run(
|
||||
mut self,
|
||||
) -> (Result<StateRootComputeOutcome, ParallelStateRootError>, SparseStateTrie<A, S>) {
|
||||
@@ -85,14 +80,10 @@ where
|
||||
while let Ok(mut update) = self.updates.recv() {
|
||||
num_iterations += 1;
|
||||
let mut num_updates = 1;
|
||||
let _enter =
|
||||
debug_span!(target: "engine::tree::payload_processor::sparse_trie", "drain updates")
|
||||
.entered();
|
||||
while let Ok(next) = self.updates.try_recv() {
|
||||
update.extend(next);
|
||||
num_updates += 1;
|
||||
}
|
||||
drop(_enter);
|
||||
|
||||
debug!(
|
||||
target: "engine::root",
|
||||
@@ -139,7 +130,6 @@ pub struct StateRootComputeOutcome {
|
||||
}
|
||||
|
||||
/// Updates the sparse trie with the given proofs and state, and returns the elapsed time.
|
||||
#[instrument(level = "debug", target = "engine::tree::payload_processor::sparse_trie", skip_all)]
|
||||
pub(crate) fn update_sparse_trie<BPF, A, S>(
|
||||
trie: &mut SparseStateTrie<A, S>,
|
||||
SparseTrieUpdate { mut state, multiproof }: SparseTrieUpdate,
|
||||
@@ -165,7 +155,6 @@ where
|
||||
);
|
||||
|
||||
// Update storage slots with new values and calculate storage roots.
|
||||
let span = tracing::Span::current();
|
||||
let (tx, rx) = mpsc::channel();
|
||||
state
|
||||
.storages
|
||||
@@ -173,16 +162,14 @@ where
|
||||
.map(|(address, storage)| (address, storage, trie.take_storage_trie(&address)))
|
||||
.par_bridge()
|
||||
.map(|(address, storage, storage_trie)| {
|
||||
let _enter =
|
||||
debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: span.clone(), "storage trie", ?address)
|
||||
.entered();
|
||||
|
||||
trace!(target: "engine::tree::payload_processor::sparse_trie", "Updating storage");
|
||||
let span = trace_span!(target: "engine::root::sparse", "Storage trie", ?address);
|
||||
let _enter = span.enter();
|
||||
trace!(target: "engine::root::sparse", "Updating storage");
|
||||
let storage_provider = blinded_provider_factory.storage_node_provider(address);
|
||||
let mut storage_trie = storage_trie.ok_or(SparseTrieErrorKind::Blind)?;
|
||||
|
||||
if storage.wiped {
|
||||
trace!(target: "engine::tree::payload_processor::sparse_trie", "Wiping storage");
|
||||
trace!(target: "engine::root::sparse", "Wiping storage");
|
||||
storage_trie.wipe()?;
|
||||
}
|
||||
|
||||
@@ -200,7 +187,7 @@ where
|
||||
continue;
|
||||
}
|
||||
|
||||
trace!(target: "engine::tree::payload_processor::sparse_trie", ?slot_nibbles, "Updating storage slot");
|
||||
trace!(target: "engine::root::sparse", ?slot_nibbles, "Updating storage slot");
|
||||
storage_trie.update_leaf(
|
||||
slot_nibbles,
|
||||
alloy_rlp::encode_fixed_size(&value).to_vec(),
|
||||
@@ -232,9 +219,6 @@ where
|
||||
let mut removed_accounts = Vec::new();
|
||||
|
||||
// Update account storage roots
|
||||
let _enter =
|
||||
tracing::debug_span!(target: "engine::tree::payload_processor::sparse_trie", "account trie")
|
||||
.entered();
|
||||
for result in rx {
|
||||
let (address, storage_trie) = result?;
|
||||
trie.insert_storage_trie(address, storage_trie);
|
||||
|
||||
@@ -44,8 +44,9 @@ use reth_trie::{
|
||||
};
|
||||
use reth_trie_db::DatabaseHashedPostState;
|
||||
use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
|
||||
use revm::context::Block;
|
||||
use std::{collections::HashMap, sync::Arc, time::Instant};
|
||||
use tracing::{debug, debug_span, error, info, instrument, trace, warn};
|
||||
use tracing::{debug, debug_span, error, info, trace, warn};
|
||||
|
||||
/// Context providing access to tree state during validation.
|
||||
///
|
||||
@@ -288,7 +289,7 @@ where
|
||||
V: PayloadValidator<T, Block = N::Block>,
|
||||
{
|
||||
debug!(
|
||||
target: "engine::tree::payload_validator",
|
||||
target: "engine::tree",
|
||||
?execution_err,
|
||||
block = ?input.num_hash(),
|
||||
"Block execution failed, checking for header validation errors"
|
||||
@@ -323,15 +324,6 @@ where
|
||||
/// - Block execution
|
||||
/// - State root computation
|
||||
/// - Fork detection
|
||||
#[instrument(
|
||||
level = "debug",
|
||||
target = "engine::tree::payload_validator",
|
||||
skip_all,
|
||||
fields(
|
||||
parent = ?input.parent_hash(),
|
||||
block_num_hash = ?input.num_hash()
|
||||
)
|
||||
)]
|
||||
pub fn validate_block_with_state<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
|
||||
&mut self,
|
||||
input: BlockOrPayload<T>,
|
||||
@@ -374,9 +366,7 @@ where
|
||||
let parent_hash = input.parent_hash();
|
||||
let block_num_hash = input.num_hash();
|
||||
|
||||
trace!(target: "engine::tree::payload_validator", "Fetching block state provider");
|
||||
let _enter =
|
||||
debug_span!(target: "engine::tree::payload_validator", "state provider").entered();
|
||||
trace!(target: "engine::tree", block=?block_num_hash, parent=?parent_hash, "Fetching block state provider");
|
||||
let Some(provider_builder) =
|
||||
ensure_ok!(self.state_provider_builder(parent_hash, ctx.state()))
|
||||
else {
|
||||
@@ -387,8 +377,8 @@ where
|
||||
)
|
||||
.into())
|
||||
};
|
||||
|
||||
let state_provider = ensure_ok!(provider_builder.build());
|
||||
drop(_enter);
|
||||
|
||||
// fetch parent block
|
||||
let Some(parent_block) = ensure_ok!(self.sealed_header_by_hash(parent_hash, ctx.state()))
|
||||
@@ -400,9 +390,7 @@ where
|
||||
.into())
|
||||
};
|
||||
|
||||
let evm_env = debug_span!(target: "engine::tree::payload_validator", "evm env")
|
||||
.in_scope(|| self.evm_env_for(&input))
|
||||
.map_err(NewPayloadError::other)?;
|
||||
let evm_env = self.evm_env_for(&input).map_err(NewPayloadError::other)?;
|
||||
|
||||
let env = ExecutionEnv { evm_env, hash: input.hash(), parent_hash: input.parent_hash() };
|
||||
|
||||
@@ -412,7 +400,8 @@ where
|
||||
let strategy = state_root_plan.strategy;
|
||||
|
||||
debug!(
|
||||
target: "engine::tree::payload_validator",
|
||||
target: "engine::tree",
|
||||
block=?block_num_hash,
|
||||
?strategy,
|
||||
"Deciding which state root algorithm to run"
|
||||
);
|
||||
@@ -428,6 +417,7 @@ where
|
||||
persisting_kind,
|
||||
parent_hash,
|
||||
ctx.state(),
|
||||
block_num_hash,
|
||||
strategy,
|
||||
));
|
||||
|
||||
@@ -462,7 +452,7 @@ where
|
||||
block
|
||||
);
|
||||
|
||||
debug!(target: "engine::tree::payload_validator", "Calculating block state root");
|
||||
debug!(target: "engine::tree", block=?block_num_hash, "Calculating block state root");
|
||||
|
||||
let root_time = Instant::now();
|
||||
|
||||
@@ -470,17 +460,17 @@ where
|
||||
|
||||
match strategy {
|
||||
StateRootStrategy::StateRootTask => {
|
||||
debug!(target: "engine::tree::payload_validator", "Using sparse trie state root algorithm");
|
||||
debug!(target: "engine::tree", block=?block_num_hash, "Using sparse trie state root algorithm");
|
||||
match handle.state_root() {
|
||||
Ok(StateRootComputeOutcome { state_root, trie_updates }) => {
|
||||
let elapsed = root_time.elapsed();
|
||||
info!(target: "engine::tree::payload_validator", ?state_root, ?elapsed, "State root task finished");
|
||||
info!(target: "engine::tree", ?state_root, ?elapsed, "State root task finished");
|
||||
// we double check the state root here for good measure
|
||||
if state_root == block.header().state_root() {
|
||||
maybe_state_root = Some((state_root, trie_updates, elapsed))
|
||||
} else {
|
||||
warn!(
|
||||
target: "engine::tree::payload_validator",
|
||||
target: "engine::tree",
|
||||
?state_root,
|
||||
block_state_root = ?block.header().state_root(),
|
||||
"State root task returned incorrect state root"
|
||||
@@ -488,12 +478,12 @@ where
|
||||
}
|
||||
}
|
||||
Err(error) => {
|
||||
debug!(target: "engine::tree::payload_validator", %error, "State root task failed");
|
||||
debug!(target: "engine::tree", %error, "State root task failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
StateRootStrategy::Parallel => {
|
||||
debug!(target: "engine::tree::payload_validator", "Using parallel state root algorithm");
|
||||
debug!(target: "engine::tree", block=?block_num_hash, "Using parallel state root algorithm");
|
||||
match self.compute_state_root_parallel(
|
||||
persisting_kind,
|
||||
block.parent_hash(),
|
||||
@@ -503,7 +493,8 @@ where
|
||||
Ok(result) => {
|
||||
let elapsed = root_time.elapsed();
|
||||
info!(
|
||||
target: "engine::tree::payload_validator",
|
||||
target: "engine::tree",
|
||||
block = ?block_num_hash,
|
||||
regular_state_root = ?result.0,
|
||||
?elapsed,
|
||||
"Regular root task finished"
|
||||
@@ -511,7 +502,7 @@ where
|
||||
maybe_state_root = Some((result.0, result.1, elapsed));
|
||||
}
|
||||
Err(error) => {
|
||||
debug!(target: "engine::tree::payload_validator", %error, "Parallel state root computation failed");
|
||||
debug!(target: "engine::tree", %error, "Parallel state root computation failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -528,9 +519,9 @@ where
|
||||
} else {
|
||||
// fallback is to compute the state root regularly in sync
|
||||
if self.config.state_root_fallback() {
|
||||
debug!(target: "engine::tree::payload_validator", "Using state root fallback for testing");
|
||||
debug!(target: "engine::tree", block=?block_num_hash, "Using state root fallback for testing");
|
||||
} else {
|
||||
warn!(target: "engine::tree::payload_validator", ?persisting_kind, "Failed to compute state root in parallel");
|
||||
warn!(target: "engine::tree", block=?block_num_hash, ?persisting_kind, "Failed to compute state root in parallel");
|
||||
self.metrics.block_validation.state_root_parallel_fallback_total.increment(1);
|
||||
}
|
||||
|
||||
@@ -542,7 +533,7 @@ where
|
||||
};
|
||||
|
||||
self.metrics.block_validation.record_state_root(&trie_output, root_elapsed.as_secs_f64());
|
||||
debug!(target: "engine::tree::payload_validator", ?root_elapsed, "Calculated state root");
|
||||
debug!(target: "engine::tree", ?root_elapsed, block=?block_num_hash, "Calculated state root");
|
||||
|
||||
// ensure state root matches
|
||||
if state_root != block.header().state_root() {
|
||||
@@ -596,12 +587,12 @@ where
|
||||
/// and block body itself.
|
||||
fn validate_block_inner(&self, block: &RecoveredBlock<N::Block>) -> Result<(), ConsensusError> {
|
||||
if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
|
||||
error!(target: "engine::tree::payload_validator", ?block, "Failed to validate header {}: {e}", block.hash());
|
||||
error!(target: "engine::tree", ?block, "Failed to validate header {}: {e}", block.hash());
|
||||
return Err(e)
|
||||
}
|
||||
|
||||
if let Err(e) = self.consensus.validate_block_pre_execution(block.sealed_block()) {
|
||||
error!(target: "engine::tree::payload_validator", ?block, "Failed to validate block {}: {e}", block.hash());
|
||||
error!(target: "engine::tree", ?block, "Failed to validate block {}: {e}", block.hash());
|
||||
return Err(e)
|
||||
}
|
||||
|
||||
@@ -609,7 +600,6 @@ where
|
||||
}
|
||||
|
||||
/// Executes a block with the given state provider
|
||||
#[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
|
||||
fn execute_block<S, Err, T>(
|
||||
&mut self,
|
||||
state_provider: S,
|
||||
@@ -624,7 +614,11 @@ where
|
||||
T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
|
||||
Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
|
||||
{
|
||||
debug!(target: "engine::tree::payload_validator", "Executing block");
|
||||
let num_hash = NumHash::new(env.evm_env.block_env.number().to(), env.hash);
|
||||
|
||||
let span = debug_span!(target: "engine::tree", "execute_block", num = ?num_hash.number, hash = ?num_hash.hash);
|
||||
let _enter = span.enter();
|
||||
debug!(target: "engine::tree", "Executing block");
|
||||
|
||||
let mut db = State::builder()
|
||||
.with_database(StateProviderDatabase::new(&state_provider))
|
||||
@@ -663,7 +657,7 @@ where
|
||||
)?;
|
||||
let execution_finish = Instant::now();
|
||||
let execution_time = execution_finish.duration_since(execution_start);
|
||||
debug!(target: "engine::tree::payload_validator", elapsed = ?execution_time, "Executed block");
|
||||
debug!(target: "engine::tree", elapsed = ?execution_time, number=?num_hash.number, "Executed block");
|
||||
Ok(output)
|
||||
}
|
||||
|
||||
@@ -675,7 +669,6 @@ where
|
||||
/// Returns `Err(_)` if error was encountered during computation.
|
||||
/// `Err(ProviderError::ConsistentView(_))` can be safely ignored and fallback computation
|
||||
/// should be used instead.
|
||||
#[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
|
||||
fn compute_state_root_parallel(
|
||||
&self,
|
||||
persisting_kind: PersistingKind,
|
||||
@@ -716,7 +709,7 @@ where
|
||||
{
|
||||
let start = Instant::now();
|
||||
|
||||
trace!(target: "engine::tree::payload_validator", block=?block.num_hash(), "Validating block consensus");
|
||||
trace!(target: "engine::tree", block=?block.num_hash(), "Validating block consensus");
|
||||
// validate block consensus rules
|
||||
if let Err(e) = self.validate_block_inner(block) {
|
||||
return Err(e.into())
|
||||
@@ -726,7 +719,7 @@ where
|
||||
if let Err(e) =
|
||||
self.consensus.validate_header_against_parent(block.sealed_header(), parent_block)
|
||||
{
|
||||
warn!(target: "engine::tree::payload_validator", ?block, "Failed to validate header {} against parent: {e}", block.hash());
|
||||
warn!(target: "engine::tree", ?block, "Failed to validate header {} against parent: {e}", block.hash());
|
||||
return Err(e.into())
|
||||
}
|
||||
|
||||
@@ -766,12 +759,6 @@ where
|
||||
/// The method handles strategy fallbacks if the preferred approach fails, ensuring
|
||||
/// block execution always completes with a valid state root.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[instrument(
|
||||
level = "debug",
|
||||
target = "engine::tree::payload_validator",
|
||||
skip_all,
|
||||
fields(strategy)
|
||||
)]
|
||||
fn spawn_payload_processor<T: ExecutableTxIterator<Evm>>(
|
||||
&mut self,
|
||||
env: ExecutionEnv<Evm>,
|
||||
@@ -780,6 +767,7 @@ where
|
||||
persisting_kind: PersistingKind,
|
||||
parent_hash: B256,
|
||||
state: &EngineApiTreeState<N>,
|
||||
block_num_hash: NumHash,
|
||||
strategy: StateRootStrategy,
|
||||
) -> Result<
|
||||
(
|
||||
@@ -833,7 +821,8 @@ where
|
||||
Err((error, txs, env, provider_builder)) => {
|
||||
// Failed to spawn proof workers, fallback to parallel state root
|
||||
error!(
|
||||
target: "engine::tree::payload_validator",
|
||||
target: "engine::tree",
|
||||
block=?block_num_hash,
|
||||
?error,
|
||||
"Failed to spawn proof workers, falling back to parallel state root"
|
||||
);
|
||||
@@ -851,7 +840,8 @@ where
|
||||
// prewarming for transaction execution
|
||||
} else {
|
||||
debug!(
|
||||
target: "engine::tree::payload_validator",
|
||||
target: "engine::tree",
|
||||
block=?block_num_hash,
|
||||
"Disabling state root task due to non-empty prefix sets"
|
||||
);
|
||||
(
|
||||
@@ -894,7 +884,7 @@ where
|
||||
state: &EngineApiTreeState<N>,
|
||||
) -> ProviderResult<Option<StateProviderBuilder<N, P>>> {
|
||||
if let Some((historical, blocks)) = state.tree_state.blocks_by_hash(hash) {
|
||||
debug!(target: "engine::tree::payload_validator", %hash, %historical, "found canonical state for block in memory, creating provider builder");
|
||||
debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory, creating provider builder");
|
||||
// the block leads back to the canonical chain
|
||||
return Ok(Some(StateProviderBuilder::new(
|
||||
self.provider.clone(),
|
||||
@@ -905,18 +895,17 @@ where
|
||||
|
||||
// Check if the block is persisted
|
||||
if let Some(header) = self.provider.header(hash)? {
|
||||
debug!(target: "engine::tree::payload_validator", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
|
||||
debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
|
||||
// For persisted blocks, we create a builder that will fetch state directly from the
|
||||
// database
|
||||
return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
|
||||
}
|
||||
|
||||
debug!(target: "engine::tree::payload_validator", %hash, "no canonical state found for block");
|
||||
debug!(target: "engine::tree", %hash, "no canonical state found for block");
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Determines the state root computation strategy based on persistence state and configuration.
|
||||
#[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
|
||||
fn plan_state_root_computation<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
|
||||
&self,
|
||||
input: &BlockOrPayload<T>,
|
||||
@@ -950,7 +939,7 @@ where
|
||||
};
|
||||
|
||||
debug!(
|
||||
target: "engine::tree::payload_validator",
|
||||
target: "engine::tree",
|
||||
block=?input.num_hash(),
|
||||
?strategy,
|
||||
"Planned state root computation strategy"
|
||||
@@ -990,12 +979,6 @@ where
|
||||
/// block.
|
||||
/// 3. Once in-memory blocks are collected and optionally filtered, we compute the
|
||||
/// [`HashedPostState`] from them.
|
||||
#[instrument(
|
||||
level = "debug",
|
||||
target = "engine::tree::payload_validator",
|
||||
skip_all,
|
||||
fields(persisting_kind, parent_hash)
|
||||
)]
|
||||
fn compute_trie_input<TP: DBProvider + BlockNumReader + TrieReader>(
|
||||
&self,
|
||||
persisting_kind: PersistingKind,
|
||||
@@ -1016,9 +999,6 @@ where
|
||||
|
||||
// If the current block is a descendant of the currently persisting blocks, then we need to
|
||||
// filter in-memory blocks, so that none of them are already persisted in the database.
|
||||
let _enter =
|
||||
debug_span!(target: "engine::tree::payload_validator", "filter in-memory blocks", len = blocks.len())
|
||||
.entered();
|
||||
if persisting_kind.is_descendant() {
|
||||
// Iterate over the blocks from oldest to newest.
|
||||
while let Some(block) = blocks.last() {
|
||||
@@ -1043,13 +1023,11 @@ where
|
||||
parent_hash.into()
|
||||
};
|
||||
}
|
||||
drop(_enter);
|
||||
|
||||
let blocks_empty = blocks.is_empty();
|
||||
if blocks_empty {
|
||||
debug!(target: "engine::tree::payload_validator", "Parent found on disk");
|
||||
if blocks.is_empty() {
|
||||
debug!(target: "engine::tree", %parent_hash, "Parent found on disk");
|
||||
} else {
|
||||
debug!(target: "engine::tree::payload_validator", %historical, blocks = blocks.len(), "Parent found in memory");
|
||||
debug!(target: "engine::tree", %parent_hash, %historical, blocks = blocks.len(), "Parent found in memory");
|
||||
}
|
||||
|
||||
// Convert the historical block to the block number.
|
||||
@@ -1057,15 +1035,12 @@ where
|
||||
.convert_hash_or_number(historical)?
|
||||
.ok_or_else(|| ProviderError::BlockHashNotFound(historical.as_hash().unwrap()))?;
|
||||
|
||||
let _enter =
|
||||
debug_span!(target: "engine::tree::payload_validator", "revert state", blocks_empty)
|
||||
.entered();
|
||||
// Retrieve revert state for historical block.
|
||||
let (revert_state, revert_trie) = if block_number == best_block_number {
|
||||
// We do not check against the `last_block_number` here because
|
||||
// `HashedPostState::from_reverts` / `trie_reverts` only use the database tables, and
|
||||
// not static files.
|
||||
debug!(target: "engine::tree::payload_validator", block_number, best_block_number, "Empty revert state");
|
||||
debug!(target: "engine::tree", block_number, best_block_number, "Empty revert state");
|
||||
(HashedPostState::default(), TrieUpdatesSorted::default())
|
||||
} else {
|
||||
let revert_state = HashedPostState::from_reverts::<KeccakKeyHasher>(
|
||||
@@ -1075,7 +1050,7 @@ where
|
||||
.map_err(ProviderError::from)?;
|
||||
let revert_trie = provider.trie_reverts(block_number + 1)?;
|
||||
debug!(
|
||||
target: "engine::tree::payload_validator",
|
||||
target: "engine::tree",
|
||||
block_number,
|
||||
best_block_number,
|
||||
accounts = revert_state.accounts.len(),
|
||||
|
||||
@@ -58,7 +58,7 @@ impl Decoder for ECIESCodec {
|
||||
type Item = IngressECIESValue;
|
||||
type Error = ECIESError;
|
||||
|
||||
#[instrument(skip_all, fields(peer=?self.ecies.remote_id, state=?self.state))]
|
||||
#[instrument(level = "trace", skip_all, fields(peer=?self.ecies.remote_id, state=?self.state))]
|
||||
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
||||
loop {
|
||||
match self.state {
|
||||
@@ -150,7 +150,7 @@ impl Decoder for ECIESCodec {
|
||||
impl Encoder<EgressECIESValue> for ECIESCodec {
|
||||
type Error = io::Error;
|
||||
|
||||
#[instrument(skip(self, buf), fields(peer=?self.ecies.remote_id, state=?self.state))]
|
||||
#[instrument(level = "trace", skip(self, buf), fields(peer=?self.ecies.remote_id, state=?self.state))]
|
||||
fn encode(&mut self, item: EgressECIESValue, buf: &mut BytesMut) -> Result<(), Self::Error> {
|
||||
match item {
|
||||
EgressECIESValue::Auth => {
|
||||
|
||||
@@ -45,7 +45,7 @@ where
|
||||
PrunePurpose::User
|
||||
}
|
||||
|
||||
#[instrument(target = "pruner", skip(self, provider), ret(level = "trace"))]
|
||||
#[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
|
||||
fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
|
||||
let range = match input.get_next_block_range() {
|
||||
Some(range) => range,
|
||||
|
||||
@@ -42,7 +42,7 @@ where
|
||||
PrunePurpose::User
|
||||
}
|
||||
|
||||
#[instrument(target = "pruner", skip(self, provider), ret(level = "trace"))]
|
||||
#[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
|
||||
fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
|
||||
crate::segments::receipts::prune(provider, input)
|
||||
}
|
||||
|
||||
@@ -45,7 +45,7 @@ where
|
||||
PrunePurpose::User
|
||||
}
|
||||
|
||||
#[instrument(target = "pruner", skip(self, provider), ret(level = "trace"))]
|
||||
#[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
|
||||
fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
|
||||
// Contract log filtering removes every receipt possible except the ones in the list. So,
|
||||
// for the other receipts it's as if they had a `PruneMode::Distance()` of
|
||||
|
||||
@@ -37,7 +37,7 @@ where
|
||||
PrunePurpose::User
|
||||
}
|
||||
|
||||
#[instrument(target = "pruner", skip(self, provider), ret(level = "trace"))]
|
||||
#[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
|
||||
fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
|
||||
let tx_range = match input.get_next_tx_num_range(provider)? {
|
||||
Some(range) => range,
|
||||
|
||||
@@ -47,7 +47,7 @@ where
|
||||
PrunePurpose::User
|
||||
}
|
||||
|
||||
#[instrument(target = "pruner", skip(self, provider), ret(level = "trace"))]
|
||||
#[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
|
||||
fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
|
||||
let range = match input.get_next_block_range() {
|
||||
Some(range) => range,
|
||||
|
||||
@@ -38,7 +38,7 @@ where
|
||||
PrunePurpose::User
|
||||
}
|
||||
|
||||
#[instrument(target = "pruner", skip(self, provider), ret(level = "trace"))]
|
||||
#[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
|
||||
fn prune(
|
||||
&self,
|
||||
provider: &Provider,
|
||||
|
||||
@@ -27,7 +27,7 @@ pub(crate) struct Batch<S> {
|
||||
// Batch responses must be sent back as a single message so we read the results from each
|
||||
// request in the batch and read the results off of a new channel, `rx_batch`, and then send the
|
||||
// complete batch response back to the client over `tx`.
|
||||
#[instrument(name = "batch", skip(b))]
|
||||
#[instrument(name = "batch", skip(b), level = "TRACE")]
|
||||
pub(crate) async fn process_batch_request<S>(
|
||||
b: Batch<S>,
|
||||
max_response_body_size: usize,
|
||||
@@ -98,7 +98,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(name = "method_call", fields(method = req.method.as_ref()), skip(req, rpc_service))]
|
||||
#[instrument(name = "method_call", fields(method = req.method.as_ref()), skip(req, rpc_service), level = "TRACE")]
|
||||
pub(crate) async fn execute_call_with_tracing<'a, S>(
|
||||
req: Request<'a>,
|
||||
rpc_service: &S,
|
||||
|
||||
@@ -443,7 +443,7 @@ struct ProcessConnection<'a, HttpMiddleware, RpcMiddleware> {
|
||||
}
|
||||
|
||||
/// Spawns the IPC connection onto a new task
|
||||
#[instrument(name = "connection", skip_all, fields(conn_id = %params.conn_id))]
|
||||
#[instrument(name = "connection", skip_all, fields(conn_id = %params.conn_id), level = "INFO")]
|
||||
fn process_connection<RpcMiddleware, HttpMiddleware>(
|
||||
params: ProcessConnection<'_, HttpMiddleware, RpcMiddleware>,
|
||||
) where
|
||||
|
||||
@@ -16,7 +16,7 @@ use tracing_futures::Instrument;
|
||||
|
||||
macro_rules! engine_span {
|
||||
() => {
|
||||
tracing::info_span!(target: "rpc", "engine")
|
||||
tracing::trace_span!(target: "rpc", "engine")
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -20,7 +20,7 @@ use std::{
|
||||
collections::HashMap,
|
||||
ops::{RangeBounds, RangeInclusive},
|
||||
};
|
||||
use tracing::{debug, instrument};
|
||||
use tracing::debug;
|
||||
|
||||
/// Extends [`StateRoot`] with operations specific for working with a database transaction.
|
||||
pub trait DatabaseStateRoot<'a, TX>: Sized {
|
||||
@@ -226,7 +226,6 @@ impl<'a, TX: DbTx> DatabaseStateRoot<'a, TX>
|
||||
}
|
||||
|
||||
impl<TX: DbTx> DatabaseHashedPostState<TX> for HashedPostState {
|
||||
#[instrument(target = "trie::db", skip(tx), fields(range))]
|
||||
fn from_reverts<KH: KeyHasher>(
|
||||
tx: &TX,
|
||||
range: impl RangeBounds<BlockNumber>,
|
||||
|
||||
@@ -693,7 +693,7 @@ where
|
||||
multi_added_removed_keys.unwrap_or_else(|| Arc::new(MultiAddedRemovedKeys::new()));
|
||||
let added_removed_keys = multi_added_removed_keys.get_storage(&hashed_address);
|
||||
|
||||
let span = tracing::info_span!(
|
||||
let span = tracing::trace_span!(
|
||||
target: "trie::proof_task",
|
||||
"Storage proof calculation",
|
||||
hashed_address = ?hashed_address,
|
||||
|
||||
@@ -741,24 +741,13 @@ impl SparseTrieInterface for ParallelSparseTrie {
|
||||
// Update subtrie hashes in parallel
|
||||
{
|
||||
use rayon::iter::{IntoParallelIterator, ParallelIterator};
|
||||
use tracing::info_span;
|
||||
|
||||
let (tx, rx) = mpsc::channel();
|
||||
|
||||
let branch_node_tree_masks = &self.branch_node_tree_masks;
|
||||
let branch_node_hash_masks = &self.branch_node_hash_masks;
|
||||
let span = tracing::Span::current();
|
||||
changed_subtries
|
||||
.into_par_iter()
|
||||
.map(|mut changed_subtrie| {
|
||||
let _enter = info_span!(
|
||||
target: "trie::sparse::parallel",
|
||||
parent: span.clone(),
|
||||
"subtrie",
|
||||
index = changed_subtrie.index
|
||||
)
|
||||
.entered();
|
||||
|
||||
#[cfg(feature = "metrics")]
|
||||
let start = std::time::Instant::now();
|
||||
changed_subtrie.subtrie.update_hashes(
|
||||
@@ -1303,7 +1292,6 @@ impl ParallelSparseTrie {
|
||||
|
||||
/// Drains any [`SparseTrieUpdatesAction`]s from the given subtrie, and applies each action to
|
||||
/// the given `updates` set. If the given set is None then this is a no-op.
|
||||
#[instrument(target = "trie::sparse::parallel", skip_all)]
|
||||
fn apply_subtrie_update_actions(
|
||||
&mut self,
|
||||
update_actions: impl Iterator<Item = SparseTrieUpdatesAction>,
|
||||
@@ -1327,7 +1315,7 @@ impl ParallelSparseTrie {
|
||||
}
|
||||
|
||||
/// Updates hashes for the upper subtrie, using nodes from both upper and lower subtries.
|
||||
#[instrument(target = "trie::parallel_sparse", skip_all, ret(level = "trace"))]
|
||||
#[instrument(level = "trace", target = "trie::parallel_sparse", skip_all, ret)]
|
||||
fn update_upper_subtrie_hashes(&mut self, prefix_set: &mut PrefixSet) -> RlpNode {
|
||||
trace!(target: "trie::parallel_sparse", "Updating upper subtrie hashes");
|
||||
|
||||
@@ -1405,7 +1393,6 @@ impl ParallelSparseTrie {
|
||||
///
|
||||
/// IMPORTANT: The method removes the subtries from `lower_subtries`, and the caller is
|
||||
/// responsible for returning them back into the array.
|
||||
#[instrument(target = "trie::sparse::parallel", skip_all, fields(prefix_set_len = prefix_set.len()))]
|
||||
fn take_changed_lower_subtries(
|
||||
&mut self,
|
||||
prefix_set: &mut PrefixSet,
|
||||
@@ -1562,7 +1549,6 @@ impl ParallelSparseTrie {
|
||||
|
||||
/// Return updated subtries back to the trie after executing any actions required on the
|
||||
/// top-level `SparseTrieUpdates`.
|
||||
#[instrument(target = "trie::sparse::parallel", skip_all)]
|
||||
fn insert_changed_subtries(
|
||||
&mut self,
|
||||
changed_subtries: impl IntoIterator<Item = ChangedSubtrie>,
|
||||
@@ -2050,7 +2036,7 @@ impl SparseSubtrie {
|
||||
/// # Panics
|
||||
///
|
||||
/// If the node at the root path does not exist.
|
||||
#[instrument(target = "trie::parallel_sparse", skip_all, fields(root = ?self.path), ret(level = "trace"))]
|
||||
#[instrument(level = "trace", target = "trie::parallel_sparse", skip_all, fields(root = ?self.path), ret)]
|
||||
fn update_hashes(
|
||||
&mut self,
|
||||
prefix_set: &mut PrefixSet,
|
||||
|
||||
@@ -16,7 +16,7 @@ workspace = true
|
||||
reth-primitives-traits.workspace = true
|
||||
reth-execution-errors.workspace = true
|
||||
reth-trie-common.workspace = true
|
||||
tracing = { workspace = true, features = ["attributes"] }
|
||||
tracing.workspace = true
|
||||
alloy-trie.workspace = true
|
||||
|
||||
# alloy
|
||||
|
||||
@@ -18,7 +18,7 @@ use reth_trie_common::{
|
||||
DecodedMultiProof, DecodedStorageMultiProof, MultiProof, Nibbles, RlpNode, StorageMultiProof,
|
||||
TrieAccount, TrieMask, TrieNode, EMPTY_ROOT_HASH, TRIE_ACCOUNT_RLP_MAX_SIZE,
|
||||
};
|
||||
use tracing::{instrument, trace};
|
||||
use tracing::trace;
|
||||
|
||||
/// Provides type-safe re-use of cleared [`SparseStateTrie`]s, which helps to save allocations
|
||||
/// across payload runs.
|
||||
@@ -208,14 +208,6 @@ where
|
||||
|
||||
/// Reveal unknown trie paths from decoded multiproof.
|
||||
/// NOTE: This method does not extensively validate the proof.
|
||||
#[instrument(
|
||||
target = "trie::sparse",
|
||||
skip_all,
|
||||
fields(
|
||||
account_nodes = multiproof.account_subtree.len(),
|
||||
storages = multiproof.storages.len()
|
||||
)
|
||||
)]
|
||||
pub fn reveal_decoded_multiproof(
|
||||
&mut self,
|
||||
multiproof: DecodedMultiProof,
|
||||
@@ -540,7 +532,6 @@ where
|
||||
/// Calculates the hashes of subtries.
|
||||
///
|
||||
/// If the trie has not been revealed, this function does nothing.
|
||||
#[instrument(target = "trie::sparse", skip_all)]
|
||||
pub fn calculate_subtries(&mut self) {
|
||||
if let SparseTrie::Revealed(trie) = &mut self.state {
|
||||
trie.update_subtrie_hashes();
|
||||
@@ -601,7 +592,6 @@ where
|
||||
}
|
||||
|
||||
/// Returns sparse trie root and trie updates if the trie has been revealed.
|
||||
#[instrument(target = "trie::sparse", skip_all)]
|
||||
pub fn root_with_updates(
|
||||
&mut self,
|
||||
provider_factory: impl TrieNodeProviderFactory,
|
||||
@@ -705,7 +695,6 @@ where
|
||||
///
|
||||
/// Returns false if the new account info and storage trie are empty, indicating the account
|
||||
/// leaf should be removed.
|
||||
#[instrument(target = "trie::sparse", skip_all)]
|
||||
pub fn update_account(
|
||||
&mut self,
|
||||
address: B256,
|
||||
@@ -748,7 +737,6 @@ where
|
||||
///
|
||||
/// Returns false if the new storage root is empty, and the account info was already empty,
|
||||
/// indicating the account leaf should be removed.
|
||||
#[instrument(target = "trie::sparse", skip_all)]
|
||||
pub fn update_account_storage_root(
|
||||
&mut self,
|
||||
address: B256,
|
||||
@@ -796,7 +784,6 @@ where
|
||||
}
|
||||
|
||||
/// Remove the account leaf node.
|
||||
#[instrument(target = "trie::sparse", skip_all)]
|
||||
pub fn remove_account_leaf(
|
||||
&mut self,
|
||||
path: &Nibbles,
|
||||
|
||||
@@ -24,7 +24,7 @@ use reth_trie_common::{
|
||||
TrieNode, CHILD_INDEX_RANGE, EMPTY_ROOT_HASH,
|
||||
};
|
||||
use smallvec::SmallVec;
|
||||
use tracing::{debug, instrument, trace};
|
||||
use tracing::{debug, trace};
|
||||
|
||||
/// The level below which the sparse trie hashes are calculated in
|
||||
/// [`SerialSparseTrie::update_subtrie_hashes`].
|
||||
@@ -175,7 +175,6 @@ impl<T: SparseTrieInterface> SparseTrie<T> {
|
||||
/// and resetting the trie to only contain an empty root node.
|
||||
///
|
||||
/// Note: This method will error if the trie is blinded.
|
||||
#[instrument(target = "trie::sparse", skip_all)]
|
||||
pub fn wipe(&mut self) -> SparseTrieResult<()> {
|
||||
let revealed = self.as_revealed_mut().ok_or(SparseTrieErrorKind::Blind)?;
|
||||
revealed.wipe();
|
||||
@@ -192,7 +191,6 @@ impl<T: SparseTrieInterface> SparseTrie<T> {
|
||||
///
|
||||
/// - `Some(B256)` with the calculated root hash if the trie is revealed.
|
||||
/// - `None` if the trie is still blind.
|
||||
#[instrument(target = "trie::sparse", skip_all)]
|
||||
pub fn root(&mut self) -> Option<B256> {
|
||||
Some(self.as_revealed_mut()?.root())
|
||||
}
|
||||
@@ -232,7 +230,6 @@ impl<T: SparseTrieInterface> SparseTrie<T> {
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns an error if the trie is still blind, or if the update fails.
|
||||
#[instrument(target = "trie::sparse", skip_all)]
|
||||
pub fn update_leaf(
|
||||
&mut self,
|
||||
path: Nibbles,
|
||||
@@ -249,7 +246,6 @@ impl<T: SparseTrieInterface> SparseTrie<T> {
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns an error if the trie is still blind, or if the leaf cannot be removed
|
||||
#[instrument(target = "trie::sparse", skip_all)]
|
||||
pub fn remove_leaf(
|
||||
&mut self,
|
||||
path: &Nibbles,
|
||||
@@ -593,13 +589,14 @@ impl SparseTrieInterface for SerialSparseTrie {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(target = "trie::sparse::serial", skip(self, provider))]
|
||||
fn update_leaf<P: TrieNodeProvider>(
|
||||
&mut self,
|
||||
full_path: Nibbles,
|
||||
value: Vec<u8>,
|
||||
provider: P,
|
||||
) -> SparseTrieResult<()> {
|
||||
trace!(target: "trie::sparse", ?full_path, ?value, "update_leaf called");
|
||||
|
||||
self.prefix_set.insert(full_path);
|
||||
let existing = self.values.insert(full_path, value);
|
||||
if existing.is_some() {
|
||||
@@ -731,7 +728,6 @@ impl SparseTrieInterface for SerialSparseTrie {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(target = "trie::sparse::serial", skip(self, provider))]
|
||||
fn remove_leaf<P: TrieNodeProvider>(
|
||||
&mut self,
|
||||
full_path: &Nibbles,
|
||||
@@ -917,7 +913,6 @@ impl SparseTrieInterface for SerialSparseTrie {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(target = "trie::sparse::serial", skip(self))]
|
||||
fn root(&mut self) -> B256 {
|
||||
// Take the current prefix set
|
||||
let mut prefix_set = core::mem::take(&mut self.prefix_set).freeze();
|
||||
@@ -1353,7 +1348,6 @@ impl SerialSparseTrie {
|
||||
///
|
||||
/// This function identifies all nodes that have changed (based on the prefix set) at the given
|
||||
/// depth and recalculates their RLP representation.
|
||||
#[instrument(target = "trie::sparse::serial", skip(self))]
|
||||
pub fn update_rlp_node_level(&mut self, depth: usize) {
|
||||
// Take the current prefix set
|
||||
let mut prefix_set = core::mem::take(&mut self.prefix_set).freeze();
|
||||
@@ -1399,7 +1393,6 @@ impl SerialSparseTrie {
|
||||
/// specified depth.
|
||||
/// - A `PrefixSetMut` containing paths shallower than the specified depth that still need to be
|
||||
/// tracked for future updates.
|
||||
#[instrument(target = "trie::sparse::serial", skip(self))]
|
||||
fn get_changed_nodes_at_depth(
|
||||
&self,
|
||||
prefix_set: &mut PrefixSet,
|
||||
@@ -1486,7 +1479,6 @@ impl SerialSparseTrie {
|
||||
/// # Panics
|
||||
///
|
||||
/// If the node at provided path does not exist.
|
||||
#[instrument(target = "trie::sparse::serial", skip_all, ret(level = "trace"))]
|
||||
pub fn rlp_node(
|
||||
&mut self,
|
||||
prefix_set: &mut PrefixSet,
|
||||
|
||||
@@ -107,7 +107,7 @@ impl<T> MockHashedCursor<T> {
|
||||
impl<T: Debug + Clone> HashedCursor for MockHashedCursor<T> {
|
||||
type Value = T;
|
||||
|
||||
#[instrument(skip(self), ret(level = "trace"))]
|
||||
#[instrument(level = "trace", skip(self), ret)]
|
||||
fn seek(&mut self, key: B256) -> Result<Option<(B256, Self::Value)>, DatabaseError> {
|
||||
// Find the first key that is greater than or equal to the given key.
|
||||
let entry = self.values.iter().find_map(|(k, v)| (k >= &key).then(|| (*k, v.clone())));
|
||||
@@ -121,7 +121,7 @@ impl<T: Debug + Clone> HashedCursor for MockHashedCursor<T> {
|
||||
Ok(entry)
|
||||
}
|
||||
|
||||
#[instrument(skip(self), ret(level = "trace"))]
|
||||
#[instrument(level = "trace", skip(self), ret)]
|
||||
fn next(&mut self) -> Result<Option<(B256, Self::Value)>, DatabaseError> {
|
||||
let mut iter = self.values.iter();
|
||||
// Jump to the first key that has a prefix of the current key if it's set, or to the first
|
||||
|
||||
@@ -191,10 +191,11 @@ where
|
||||
///
|
||||
/// NOTE: The iteration will start from the key of the previous hashed entry if it was supplied.
|
||||
#[instrument(
|
||||
level = "trace",
|
||||
target = "trie::node_iter",
|
||||
skip_all,
|
||||
fields(trie_type = ?self.trie_type),
|
||||
ret(level = "trace")
|
||||
ret
|
||||
)]
|
||||
pub fn try_next(
|
||||
&mut self,
|
||||
|
||||
@@ -109,7 +109,7 @@ impl MockTrieCursor {
|
||||
}
|
||||
|
||||
impl TrieCursor for MockTrieCursor {
|
||||
#[instrument(skip(self), ret(level = "trace"))]
|
||||
#[instrument(level = "trace", skip(self), ret)]
|
||||
fn seek_exact(
|
||||
&mut self,
|
||||
key: Nibbles,
|
||||
@@ -125,7 +125,7 @@ impl TrieCursor for MockTrieCursor {
|
||||
Ok(entry)
|
||||
}
|
||||
|
||||
#[instrument(skip(self), ret(level = "trace"))]
|
||||
#[instrument(level = "trace", skip(self), ret)]
|
||||
fn seek(
|
||||
&mut self,
|
||||
key: Nibbles,
|
||||
@@ -142,7 +142,7 @@ impl TrieCursor for MockTrieCursor {
|
||||
Ok(entry)
|
||||
}
|
||||
|
||||
#[instrument(skip(self), ret(level = "trace"))]
|
||||
#[instrument(level = "trace", skip(self), ret)]
|
||||
fn next(&mut self) -> Result<Option<(Nibbles, BranchNodeCompact)>, DatabaseError> {
|
||||
let mut iter = self.trie_nodes.iter();
|
||||
// Jump to the first key that has a prefix of the current key if it's set, or to the first
|
||||
@@ -161,7 +161,7 @@ impl TrieCursor for MockTrieCursor {
|
||||
Ok(entry)
|
||||
}
|
||||
|
||||
#[instrument(skip(self), ret(level = "trace"))]
|
||||
#[instrument(level = "trace", skip(self), ret)]
|
||||
fn current(&mut self) -> Result<Option<Nibbles>, DatabaseError> {
|
||||
Ok(self.current_key)
|
||||
}
|
||||
|
||||
@@ -157,7 +157,7 @@ impl<C: TrieCursor, K: AsRef<AddedRemovedKeys>> TrieWalker<C, K> {
|
||||
}
|
||||
|
||||
/// Returns the next unprocessed key in the trie along with its raw [`Nibbles`] representation.
|
||||
#[instrument(skip(self), ret(level = "trace"))]
|
||||
#[instrument(level = "trace", skip(self), ret)]
|
||||
pub fn next_unprocessed_key(&self) -> Option<(B256, Nibbles)> {
|
||||
self.key()
|
||||
.and_then(|key| if self.can_skip_current_node { key.increment() } else { Some(*key) })
|
||||
@@ -297,7 +297,7 @@ impl<C: TrieCursor, K: AsRef<AddedRemovedKeys>> TrieWalker<C, K> {
|
||||
}
|
||||
|
||||
/// Consumes the next node in the trie, updating the stack.
|
||||
#[instrument(skip(self), ret(level = "trace"))]
|
||||
#[instrument(level = "trace", skip(self), ret)]
|
||||
fn consume_node(&mut self) -> Result<(), DatabaseError> {
|
||||
let Some((key, node)) = self.node(false)? else {
|
||||
// If no next node is found, clear the stack.
|
||||
@@ -343,7 +343,7 @@ impl<C: TrieCursor, K: AsRef<AddedRemovedKeys>> TrieWalker<C, K> {
|
||||
}
|
||||
|
||||
/// Moves to the next sibling node in the trie, updating the stack.
|
||||
#[instrument(skip(self), ret(level = "trace"))]
|
||||
#[instrument(level = "trace", skip(self), ret)]
|
||||
fn move_to_next_sibling(
|
||||
&mut self,
|
||||
allow_root_to_child_nibble: bool,
|
||||
|
||||
Reference in New Issue
Block a user