mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
1 Commits
push
...
matt/fix-t
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4b9244c7d0 |
@@ -318,34 +318,92 @@ where
|
||||
let now = Instant::now();
|
||||
|
||||
let mut finished_state_updates = false;
|
||||
let mut iteration = 0u64;
|
||||
let mut last_log = Instant::now();
|
||||
|
||||
loop {
|
||||
iteration += 1;
|
||||
|
||||
// Periodic status log every 2 seconds
|
||||
if last_log.elapsed() > Duration::from_secs(2) {
|
||||
let pending_storage: usize = self.storage_updates.values().map(|v| v.len()).sum();
|
||||
let pending_storage_accounts =
|
||||
self.storage_updates.values().filter(|v| !v.is_empty()).count();
|
||||
tracing::warn!(
|
||||
target: "engine::tree::payload_processor::sparse_trie",
|
||||
iteration,
|
||||
elapsed_ms = now.elapsed().as_millis(),
|
||||
finished_state_updates,
|
||||
account_updates = self.account_updates.len(),
|
||||
pending_storage,
|
||||
pending_storage_accounts,
|
||||
pending_accounts = self.pending_account_updates.len(),
|
||||
proof_result_rx_len = self.proof_result_rx.len(),
|
||||
updates_rx_len = self.updates.len(),
|
||||
"SparseTrieCacheTask run loop status"
|
||||
);
|
||||
last_log = Instant::now();
|
||||
}
|
||||
|
||||
crossbeam_channel::select_biased! {
|
||||
recv(self.proof_result_rx) -> message => {
|
||||
let Ok(result) = message else {
|
||||
unreachable!("we own the sender half")
|
||||
};
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor::sparse_trie",
|
||||
iteration,
|
||||
"Received proof result from proof_result_rx"
|
||||
);
|
||||
self.on_proof_result(result)?;
|
||||
},
|
||||
recv(self.updates) -> message => {
|
||||
let update = match message {
|
||||
Ok(m) => m,
|
||||
Err(_) => {
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor::sparse_trie",
|
||||
iteration,
|
||||
"Updates channel closed, breaking loop"
|
||||
);
|
||||
break
|
||||
}
|
||||
};
|
||||
|
||||
match update {
|
||||
MultiProofMessage::PrefetchProofs(targets) => {
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor::sparse_trie",
|
||||
iteration,
|
||||
"Received PrefetchProofs"
|
||||
);
|
||||
self.on_prewarm_targets(targets);
|
||||
}
|
||||
MultiProofMessage::StateUpdate(_, state) => {
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor::sparse_trie",
|
||||
iteration,
|
||||
accounts = state.len(),
|
||||
"Received StateUpdate"
|
||||
);
|
||||
self.on_state_update(state);
|
||||
}
|
||||
MultiProofMessage::EmptyProof { sequence_number: _, state } => {
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor::sparse_trie",
|
||||
iteration,
|
||||
"Received EmptyProof"
|
||||
);
|
||||
self.on_hashed_state_update(state);
|
||||
}
|
||||
MultiProofMessage::BlockAccessList(_) => todo!(),
|
||||
MultiProofMessage::FinishedStateUpdates => {
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor::sparse_trie",
|
||||
iteration,
|
||||
elapsed_ms = now.elapsed().as_millis(),
|
||||
"Received FinishedStateUpdates"
|
||||
);
|
||||
finished_state_updates = true;
|
||||
}
|
||||
}
|
||||
@@ -354,15 +412,19 @@ where
|
||||
|
||||
self.process_updates()?;
|
||||
|
||||
if finished_state_updates &&
|
||||
self.account_updates.is_empty() &&
|
||||
self.storage_updates.iter().all(|(_, updates)| updates.is_empty())
|
||||
{
|
||||
let storage_empty = self.storage_updates.iter().all(|(_, updates)| updates.is_empty());
|
||||
if finished_state_updates && self.account_updates.is_empty() && storage_empty {
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor::sparse_trie",
|
||||
iteration,
|
||||
elapsed_ms = now.elapsed().as_millis(),
|
||||
"Exit condition met, breaking loop"
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
debug!(target: "engine::root", "All proofs processed, ending calculation");
|
||||
debug!(target: "engine::root", iteration, elapsed_ms = now.elapsed().as_millis(), "All proofs processed, ending calculation");
|
||||
|
||||
let start = Instant::now();
|
||||
let (state_root, trie_updates) =
|
||||
@@ -460,6 +522,14 @@ where
|
||||
unreachable!("sparse trie as cache must only be used with multiproof v2");
|
||||
};
|
||||
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor::sparse_trie",
|
||||
account_proofs = result.account_proofs.len(),
|
||||
storage_accounts = result.storage_proofs.len(),
|
||||
storage_proofs_total = result.storage_proofs.values().map(|v| v.len()).sum::<usize>(),
|
||||
"on_proof_result: revealing multiproof"
|
||||
);
|
||||
|
||||
self.trie.reveal_decoded_multiproof_v2(result).map_err(|e| {
|
||||
ParallelStateRootError::Other(format!("could not reveal multiproof: {e:?}"))
|
||||
})
|
||||
@@ -468,33 +538,44 @@ where
|
||||
/// Applies updates to the sparse trie and dispatches requested multiproof targets.
|
||||
fn process_updates(&mut self) -> Result<(), ProviderError> {
|
||||
let mut targets = MultiProofTargetsV2::default();
|
||||
let mut storage_proofs_requested = 0usize;
|
||||
#[allow(unused_variables)]
|
||||
let storage_proofs_skipped = 0usize;
|
||||
|
||||
for (addr, updates) in &mut self.storage_updates {
|
||||
let updates_before = updates.len();
|
||||
let trie = self.trie.get_or_create_storage_trie_mut(*addr);
|
||||
let fetched_storage = self.fetched_storage_targets.entry(*addr).or_default();
|
||||
let _fetched_storage = self.fetched_storage_targets.entry(*addr).or_default();
|
||||
|
||||
trie.update_leaves(updates, |path, min_len| match fetched_storage.entry(path) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
if min_len < *entry.get() {
|
||||
entry.insert(min_len);
|
||||
targets
|
||||
.storage_targets
|
||||
.entry(*addr)
|
||||
.or_default()
|
||||
.push(Target::new(path).with_min_len(min_len));
|
||||
}
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(min_len);
|
||||
targets
|
||||
.storage_targets
|
||||
.entry(*addr)
|
||||
.or_default()
|
||||
.push(Target::new(path).with_min_len(min_len));
|
||||
}
|
||||
trie.update_leaves(updates, |path, min_len| {
|
||||
// Always dispatch proof request, bypassing cache to debug stall issue
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor::sparse_trie",
|
||||
?addr,
|
||||
?path,
|
||||
min_len,
|
||||
"update_leaves: requesting proof for blinded path"
|
||||
);
|
||||
targets
|
||||
.storage_targets
|
||||
.entry(*addr)
|
||||
.or_default()
|
||||
.push(Target::new(path).with_min_len(min_len));
|
||||
storage_proofs_requested += 1;
|
||||
})
|
||||
.map_err(ProviderError::other)?;
|
||||
|
||||
// Log if updates were not drained
|
||||
if !updates.is_empty() {
|
||||
trace!(
|
||||
target: "engine::tree::payload_processor::sparse_trie",
|
||||
?addr,
|
||||
updates_before,
|
||||
updates_remaining = updates.len(),
|
||||
"Storage updates not fully drained"
|
||||
);
|
||||
}
|
||||
|
||||
// If all storage updates were processed, we can now compute the new storage root.
|
||||
if updates.is_empty() {
|
||||
let storage_root =
|
||||
@@ -601,6 +682,15 @@ where
|
||||
}
|
||||
|
||||
if !targets.is_empty() {
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor::sparse_trie",
|
||||
account_targets = targets.account_targets.len(),
|
||||
storage_accounts = targets.storage_targets.len(),
|
||||
storage_slots = targets.storage_targets.values().map(|v| v.len()).sum::<usize>(),
|
||||
storage_proofs_requested,
|
||||
storage_proofs_skipped,
|
||||
"process_updates: dispatching multiproof request"
|
||||
);
|
||||
self.proof_worker_handle.dispatch_account_multiproof(AccountMultiproofInput::V2 {
|
||||
targets,
|
||||
proof_result_sender: ProofResultContext::new(
|
||||
@@ -610,6 +700,17 @@ where
|
||||
Instant::now(),
|
||||
),
|
||||
})?;
|
||||
} else if storage_proofs_skipped > 0 {
|
||||
// This is expected when proofs are in flight - they will be re-requested
|
||||
// after the proof results arrive and the cache is cleared.
|
||||
trace!(
|
||||
target: "engine::tree::payload_processor::sparse_trie",
|
||||
storage_proofs_skipped,
|
||||
storage_proofs_requested,
|
||||
account_updates = self.account_updates.len(),
|
||||
pending_storage = self.storage_updates.values().map(|v| v.len()).sum::<usize>(),
|
||||
"process_updates: awaiting in-flight proofs"
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -1387,7 +1387,17 @@ where
|
||||
let account_proofs =
|
||||
v2_account_calculator.proof(&mut value_encoder, &mut account_targets)?;
|
||||
|
||||
let (storage_proofs, value_encoder_stats) = value_encoder.finalize()?;
|
||||
debug!(target: "trie::proof_task", "Starting value_encoder.finalize()");
|
||||
let finalize_start = Instant::now();
|
||||
let (storage_proofs, value_encoder_stats) = value_encoder.finalize().unwrap();
|
||||
let finalize_elapsed = finalize_start.elapsed();
|
||||
debug!(
|
||||
target: "trie::proof_task",
|
||||
finalize_elapsed_ms = finalize_elapsed.as_millis(),
|
||||
storage_proofs_count = storage_proofs.len(),
|
||||
storage_wait_time_ms = value_encoder_stats.storage_wait_time.as_millis(),
|
||||
"value_encoder.finalize() completed"
|
||||
);
|
||||
|
||||
let proof = DecodedMultiProofV2 { account_proofs, storage_proofs };
|
||||
|
||||
@@ -1815,15 +1825,13 @@ fn dispatch_v2_storage_proofs(
|
||||
let mut storage_proof_receivers =
|
||||
B256Map::with_capacity_and_hasher(account_targets.len(), Default::default());
|
||||
|
||||
// Collect hashed addresses from account targets that need their storage roots computed
|
||||
let account_target_addresses: B256Set = account_targets.iter().map(|t| t.key()).collect();
|
||||
|
||||
// For storage targets with associated account proofs, ensure the first target has
|
||||
// min_len(0) so the root node is returned for storage root computation
|
||||
for (hashed_address, targets) in &mut storage_targets {
|
||||
if account_target_addresses.contains(hashed_address) &&
|
||||
let Some(first) = targets.first_mut()
|
||||
{
|
||||
// For all storage targets, ensure the first target has min_len(0) so the root node
|
||||
// is always returned. This is needed because:
|
||||
// 1. For accounts with account proofs: storage root computation requires the root
|
||||
// 2. For storage-only targets: if the target path doesn't exist, we still need nodes
|
||||
// to prove non-existence. With min_len > 0, an empty subtrie returns no nodes.
|
||||
for (_hashed_address, targets) in &mut storage_targets {
|
||||
if let Some(first) = targets.first_mut() {
|
||||
*first = first.with_min_len(0);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -280,8 +280,21 @@ impl<TC, HC> AsyncAccountValueEncoder<TC, HC> {
|
||||
|
||||
// Any remaining dispatched proofs need to have their results collected.
|
||||
// These are proofs that were pre-dispatched but not consumed during proof calculation.
|
||||
for (hashed_address, rx) in &self.dispatched {
|
||||
let dispatched_count = self.dispatched.len();
|
||||
tracing::debug!(
|
||||
target: "trie::value_encoder",
|
||||
dispatched_count,
|
||||
"finalize: collecting remaining dispatched proofs"
|
||||
);
|
||||
for (i, (hashed_address, rx)) in self.dispatched.iter().enumerate() {
|
||||
let wait_start = Instant::now();
|
||||
tracing::debug!(
|
||||
target: "trie::value_encoder",
|
||||
i,
|
||||
dispatched_count,
|
||||
?hashed_address,
|
||||
"finalize: waiting for storage proof"
|
||||
);
|
||||
let result = rx
|
||||
.recv()
|
||||
.map_err(|_| {
|
||||
@@ -290,7 +303,16 @@ impl<TC, HC> AsyncAccountValueEncoder<TC, HC> {
|
||||
)))
|
||||
})?
|
||||
.result?;
|
||||
stats.storage_wait_time += wait_start.elapsed();
|
||||
let wait_elapsed = wait_start.elapsed();
|
||||
stats.storage_wait_time += wait_elapsed;
|
||||
tracing::debug!(
|
||||
target: "trie::value_encoder",
|
||||
i,
|
||||
dispatched_count,
|
||||
?hashed_address,
|
||||
wait_elapsed_ms = wait_elapsed.as_millis(),
|
||||
"finalize: received storage proof"
|
||||
);
|
||||
|
||||
let StorageProofResult::V2 { proof, .. } = result else {
|
||||
panic!("StorageProofResult is not V2: {result:?}")
|
||||
|
||||
@@ -1059,6 +1059,8 @@ impl SparseTrieExt for ParallelSparseTrie {
|
||||
}
|
||||
|
||||
fn prune(&mut self, max_depth: usize) -> usize {
|
||||
let prune_start = std::time::Instant::now();
|
||||
|
||||
// Decay heat for subtries not modified this cycle
|
||||
self.subtrie_heat.decay_and_reset();
|
||||
|
||||
@@ -1209,6 +1211,27 @@ impl SparseTrieExt for ParallelSparseTrie {
|
||||
}
|
||||
});
|
||||
|
||||
debug!(
|
||||
target: "trie::sparse::parallel",
|
||||
max_depth,
|
||||
nodes_converted,
|
||||
upper_roots = roots_upper.len(),
|
||||
lower_roots = roots_lower.len(),
|
||||
elapsed_ms = prune_start.elapsed().as_millis(),
|
||||
"ParallelSparseTrie::prune completed - nodes converted to Hash stubs"
|
||||
);
|
||||
|
||||
// Log first few pruned paths for debugging
|
||||
for (i, (path, hash)) in effective_pruned_roots.iter().take(5).enumerate() {
|
||||
trace!(
|
||||
target: "trie::sparse::parallel",
|
||||
i,
|
||||
?path,
|
||||
?hash,
|
||||
"Pruned node path"
|
||||
);
|
||||
}
|
||||
|
||||
nodes_converted
|
||||
}
|
||||
|
||||
@@ -1348,9 +1371,14 @@ impl ParallelSparseTrie {
|
||||
///
|
||||
/// Returns `(target_key, min_len)` where:
|
||||
/// - `target_key` is `full_key` if `path` is a prefix of `full_path`, otherwise the padded path
|
||||
/// - `min_len` is always based on `path.len()`
|
||||
/// - `min_len` is always 0 to ensure we get the full proof including the root.
|
||||
/// This is necessary because blinded nodes created by pruning may point to subtries
|
||||
/// where the target slot doesn't exist, and with min_len > 0 the proof calculator
|
||||
/// would return nothing (no nodes to prove non-existence).
|
||||
fn proof_target_for_path(full_key: B256, full_path: &Nibbles, path: &Nibbles) -> (B256, u8) {
|
||||
let min_len = (path.len() as u8).min(64);
|
||||
// Always use min_len=0 to ensure we get the root and full proof path.
|
||||
// The target key still uses the blinded path to identify what we're looking for.
|
||||
let min_len = 0;
|
||||
let target_key =
|
||||
if full_path.starts_with(path) { full_key } else { Self::nibbles_to_padded_b256(path) };
|
||||
(target_key, min_len)
|
||||
|
||||
@@ -19,9 +19,7 @@ use reth_trie_common::{
|
||||
Nibbles, ProofTrieNode, RlpNode, StorageMultiProof, TrieAccount, TrieNode, EMPTY_ROOT_HASH,
|
||||
TRIE_ACCOUNT_RLP_MAX_SIZE,
|
||||
};
|
||||
#[cfg(feature = "std")]
|
||||
use tracing::debug;
|
||||
use tracing::{instrument, trace};
|
||||
use tracing::{debug, instrument, trace};
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Sparse state trie representing lazy-loaded Ethereum state trie.
|
||||
@@ -505,9 +503,23 @@ where
|
||||
trie: &mut RevealableSparseTrie<S>,
|
||||
retain_updates: bool,
|
||||
) -> SparseStateTrieResult<ProofNodesMetricValues> {
|
||||
let nodes_len_before_filter = nodes.len();
|
||||
let revealed_nodes_len_before = revealed_nodes.len();
|
||||
let FilteredV2ProofNodes { root_node, nodes, new_nodes, metric_values } =
|
||||
filter_revealed_v2_proof_nodes(nodes, revealed_nodes)?;
|
||||
|
||||
debug!(
|
||||
target: "trie::sparse",
|
||||
?account,
|
||||
nodes_len_before_filter,
|
||||
revealed_nodes_len_before,
|
||||
revealed_nodes_len_after = revealed_nodes.len(),
|
||||
nodes_after_filter = nodes.len(),
|
||||
skipped = metric_values.skipped_nodes,
|
||||
has_root = root_node.is_some(),
|
||||
"reveal_storage_v2_proof_nodes_inner: after filter"
|
||||
);
|
||||
|
||||
let trie = if let Some(root_node) = root_node {
|
||||
trace!(target: "trie::sparse", ?account, ?root_node, "Revealing root storage node from V2 proof");
|
||||
trie.reveal_root(root_node.node, root_node.masks, retain_updates)?
|
||||
@@ -1028,18 +1040,36 @@ where
|
||||
#[cfg(feature = "std")]
|
||||
#[instrument(target = "trie::sparse", skip_all, fields(max_depth, max_storage_tries))]
|
||||
pub fn prune(&mut self, max_depth: usize, max_storage_tries: usize) {
|
||||
let revealed_account_paths_before = self.revealed_account_paths.len();
|
||||
let storage_tries_before = self.storage.tries.len();
|
||||
|
||||
// Prune state and storage tries in parallel
|
||||
rayon::join(
|
||||
let (account_nodes_pruned, _) = rayon::join(
|
||||
|| {
|
||||
if let Some(trie) = self.state.as_revealed_mut() {
|
||||
trie.prune(max_depth);
|
||||
}
|
||||
let nodes_pruned = if let Some(trie) = self.state.as_revealed_mut() {
|
||||
trie.prune(max_depth)
|
||||
} else {
|
||||
0
|
||||
};
|
||||
self.revealed_account_paths.clear();
|
||||
nodes_pruned
|
||||
},
|
||||
|| {
|
||||
self.storage.prune(max_depth, max_storage_tries);
|
||||
},
|
||||
);
|
||||
|
||||
debug!(
|
||||
target: "trie::sparse",
|
||||
max_depth,
|
||||
max_storage_tries,
|
||||
revealed_account_paths_before,
|
||||
revealed_account_paths_after = self.revealed_account_paths.len(),
|
||||
storage_tries_before,
|
||||
storage_tries_after = self.storage.tries.len(),
|
||||
account_nodes_pruned,
|
||||
"SparseStateTrie::prune completed - revealed_account_paths cleared"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1144,6 +1174,11 @@ impl<S: SparseTrieTrait + SparseTrieExt> StorageTries<S> {
|
||||
trie.prune(max_depth);
|
||||
heat_state.prune_backlog = 0; // Reset backlog after prune
|
||||
stats.pruned_count += 1;
|
||||
// Clear revealed_paths for this trie since pruning converts nodes back to
|
||||
// SparseNode::Hash. They need to be re-revealed on next proof.
|
||||
if let Some(paths) = self.revealed_paths.get_mut(address) {
|
||||
paths.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
stats.prune_elapsed = prune_start.elapsed();
|
||||
|
||||
@@ -1226,8 +1226,18 @@ where
|
||||
"Maybe retaining root",
|
||||
);
|
||||
match (sub_trie_targets.retain_root, self.child_stack.pop()) {
|
||||
(false, _) => {
|
||||
// Whether the root node is exists or not, we don't want it.
|
||||
(false, None) => {
|
||||
// Even if retain_root is false, when the trie is empty we must return EmptyRoot
|
||||
// so the caller can reveal that the trie has no data at all. Otherwise the caller
|
||||
// would receive an empty proof and have no way to unblind the trie.
|
||||
self.retained_proofs.push(ProofTrieNode {
|
||||
path: Nibbles::new(),
|
||||
node: TrieNode::EmptyRoot,
|
||||
masks: None,
|
||||
});
|
||||
}
|
||||
(false, Some(_)) => {
|
||||
// Root node exists but we don't want it (min_len > 0 and there are leaves).
|
||||
}
|
||||
(true, None) => {
|
||||
// If `child_stack` is empty it means there was no keys at all, retain an empty
|
||||
|
||||
Reference in New Issue
Block a user