mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
9 Commits
push
...
feat/proof
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
432988c575 | ||
|
|
250b253033 | ||
|
|
ef8bdbced0 | ||
|
|
29548777ba | ||
|
|
af00894f70 | ||
|
|
41ba573c49 | ||
|
|
e6148eedb7 | ||
|
|
f0961d5859 | ||
|
|
77f8baea4e |
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -8280,6 +8280,7 @@ dependencies = [
|
||||
"alloy-rpc-types-engine",
|
||||
"auto_impl",
|
||||
"futures",
|
||||
"rand 0.9.2",
|
||||
"reth-chain-state",
|
||||
"reth-errors",
|
||||
"reth-ethereum-primitives",
|
||||
@@ -8292,6 +8293,7 @@ dependencies = [
|
||||
"serde",
|
||||
"thiserror 2.0.18",
|
||||
"tokio",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -10666,6 +10668,7 @@ dependencies = [
|
||||
"proptest-arbitrary-interop",
|
||||
"rand 0.9.2",
|
||||
"rayon",
|
||||
"reth-engine-primitives",
|
||||
"reth-execution-errors",
|
||||
"reth-metrics",
|
||||
"reth-primitives-traits",
|
||||
|
||||
@@ -107,6 +107,7 @@ js-tracer = [
|
||||
"reth-rpc-eth-types/js-tracer",
|
||||
]
|
||||
|
||||
debug-jitter = ["reth-node-builder/debug-jitter"]
|
||||
dev = ["reth-ethereum-cli/dev"]
|
||||
|
||||
asm-keccak = [
|
||||
|
||||
@@ -36,6 +36,8 @@ futures = { workspace = true, optional = true }
|
||||
auto_impl.workspace = true
|
||||
serde.workspace = true
|
||||
thiserror.workspace = true
|
||||
rand = { workspace = true, optional = true }
|
||||
tracing = { workspace = true, optional = true }
|
||||
|
||||
[features]
|
||||
default = ["std"]
|
||||
@@ -54,3 +56,4 @@ std = [
|
||||
"thiserror/std",
|
||||
"reth-evm/std",
|
||||
]
|
||||
debug-jitter = ["dep:rand", "dep:tracing"]
|
||||
|
||||
54
crates/engine/primitives/src/jitter.rs
Normal file
54
crates/engine/primitives/src/jitter.rs
Normal file
@@ -0,0 +1,54 @@
|
||||
//! Debug jitter utilities for testing timing-related bugs.
|
||||
//!
|
||||
//! When the `debug-jitter` feature is enabled, various components can add
|
||||
//! random delays to help trigger out-of-order timing bugs that may only
|
||||
//! manifest in real-world conditions.
|
||||
//!
|
||||
//! Control via environment variable:
|
||||
//! - `RETH_DEBUG_JITTER_MS`: Maximum jitter in milliseconds (0-N random delay)
|
||||
//!
|
||||
//! Example: `RETH_DEBUG_JITTER_MS=5` adds 0-5ms random delays.
|
||||
|
||||
use std::{sync::OnceLock, thread, time::Duration};
|
||||
|
||||
use rand::Rng;
|
||||
use tracing::trace;
|
||||
|
||||
/// Cached jitter configuration from environment.
|
||||
static JITTER_CONFIG: OnceLock<Option<u64>> = OnceLock::new();
|
||||
|
||||
/// Reads the jitter configuration from environment variables.
|
||||
///
|
||||
/// Returns `Some(max_ms)` if jitter is enabled, `None` otherwise.
|
||||
fn get_jitter_config() -> Option<u64> {
|
||||
*JITTER_CONFIG.get_or_init(|| {
|
||||
std::env::var("RETH_DEBUG_JITTER_MS")
|
||||
.ok()
|
||||
.and_then(|v| v.parse::<u64>().ok())
|
||||
.filter(|&ms| ms > 0)
|
||||
})
|
||||
}
|
||||
|
||||
/// Applies a random jitter delay if configured.
|
||||
///
|
||||
/// When `RETH_DEBUG_JITTER_MS` is set to a positive value N,
|
||||
/// this function sleeps for a random duration between 0 and N milliseconds.
|
||||
///
|
||||
/// This is useful for testing timing-sensitive code paths that may have
|
||||
/// race conditions or ordering bugs that only manifest with variable latencies.
|
||||
///
|
||||
/// The `context` parameter is used for logging to identify where jitter was applied.
|
||||
pub fn maybe_apply_jitter(context: &str) {
|
||||
if let Some(max_ms) = get_jitter_config() {
|
||||
let delay_ms = rand::rng().random_range(0..=max_ms);
|
||||
if delay_ms > 0 {
|
||||
trace!(
|
||||
target: "reth::jitter",
|
||||
context,
|
||||
delay_ms,
|
||||
"Applying debug jitter delay"
|
||||
);
|
||||
thread::sleep(Duration::from_millis(delay_ms));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -46,6 +46,10 @@ pub use invalid_block_hook::{InvalidBlockHook, InvalidBlockHooks, NoopInvalidBlo
|
||||
pub mod config;
|
||||
pub use config::*;
|
||||
|
||||
/// Debug jitter utilities for testing timing-related bugs.
|
||||
#[cfg(feature = "debug-jitter")]
|
||||
pub mod jitter;
|
||||
|
||||
/// This type defines the versioned types of the engine API based on the [ethereum engine API](https://github.com/ethereum/execution-apis/tree/main/src/engine).
|
||||
///
|
||||
/// This includes the execution payload types and payload attributes that are used to trigger a
|
||||
|
||||
@@ -116,6 +116,7 @@ name = "state_root_task"
|
||||
harness = false
|
||||
|
||||
[features]
|
||||
debug-jitter = ["reth-engine-primitives/debug-jitter", "reth-trie-parallel/debug-jitter"]
|
||||
test-utils = [
|
||||
"reth-chain-state/test-utils",
|
||||
"reth-chainspec/test-utils",
|
||||
|
||||
@@ -596,6 +596,10 @@ where
|
||||
multiproof_targets_from_state(res.state, v2_proofs_enabled);
|
||||
metrics.prefetch_storage_targets.record(storage_targets as f64);
|
||||
if let Some(to_multi_proof) = &to_multi_proof {
|
||||
// Apply debug jitter before sending result (helps trigger timing-related bugs)
|
||||
//#[cfg(feature = "debug-jitter")]
|
||||
//reth_engine_primitives::jitter::maybe_apply_jitter("prewarm::worker_outcome"
|
||||
// );
|
||||
let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(targets));
|
||||
}
|
||||
drop(_enter);
|
||||
|
||||
@@ -623,6 +623,7 @@ where
|
||||
if new { &mut self.new_storage_updates } else { &mut self.storage_updates };
|
||||
|
||||
// Process all storage updates in parallel, skipping tries with no pending updates.
|
||||
let span = tracing::Span::current();
|
||||
let storage_results = storage_updates
|
||||
.iter_mut()
|
||||
.filter(|(_, updates)| !updates.is_empty())
|
||||
@@ -634,6 +635,7 @@ where
|
||||
})
|
||||
.par_bridge_buffered()
|
||||
.map(|(address, updates, mut fetched, mut trie)| {
|
||||
let _enter = debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage trie leaf updates", ?address).entered();
|
||||
let mut targets = Vec::new();
|
||||
|
||||
trie.update_leaves(updates, |path, min_len| match fetched.entry(path) {
|
||||
@@ -714,6 +716,7 @@ where
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let span = tracing::Span::current();
|
||||
let roots = self
|
||||
.trie
|
||||
.storage_tries_mut()
|
||||
@@ -722,6 +725,7 @@ where
|
||||
self.storage_updates.get(*address).is_some_and(|updates| updates.is_empty())
|
||||
})
|
||||
.map(|(address, trie)| {
|
||||
let _enter = debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage root", ?address).entered();
|
||||
let root =
|
||||
trie.root().expect("updates are drained, trie should be revealed by now");
|
||||
|
||||
|
||||
@@ -834,6 +834,11 @@ where
|
||||
let gas_used = executor.execute_transaction(tx)?;
|
||||
self.metrics.record_transaction_execution(tx_start.elapsed());
|
||||
|
||||
// Apply debug jitter before sending result (helps trigger timing-related bugs)
|
||||
//#[cfg(feature = "debug-jitter")]
|
||||
//reth_engine_primitives::jitter::maybe_apply_jitter("
|
||||
// payload_validator::main_execution");
|
||||
|
||||
let current_len = executor.receipts().len();
|
||||
if current_len > last_sent_len {
|
||||
last_sent_len = current_len;
|
||||
|
||||
@@ -96,6 +96,7 @@ reth-evm-ethereum = { workspace = true, features = ["test-utils"] }
|
||||
|
||||
[features]
|
||||
default = []
|
||||
debug-jitter = ["reth-engine-tree/debug-jitter"]
|
||||
js-tracer = [
|
||||
"reth-rpc/js-tracer",
|
||||
"reth-node-ethereum/js-tracer",
|
||||
|
||||
@@ -14,6 +14,7 @@ workspace = true
|
||||
[dependencies]
|
||||
# reth
|
||||
reth-primitives-traits = { workspace = true, features = ["dashmap"] }
|
||||
reth-engine-primitives = { workspace = true, optional = true }
|
||||
reth-execution-errors.workspace = true
|
||||
reth-provider.workspace = true
|
||||
reth-storage-errors.workspace = true
|
||||
@@ -40,6 +41,8 @@ crossbeam-channel.workspace = true
|
||||
reth-metrics = { workspace = true, optional = true }
|
||||
metrics = { workspace = true, optional = true }
|
||||
|
||||
|
||||
|
||||
[dev-dependencies]
|
||||
# reth
|
||||
reth-primitives-traits.workspace = true
|
||||
@@ -57,6 +60,7 @@ tokio = { workspace = true, features = ["rt", "rt-multi-thread", "macros"] }
|
||||
[features]
|
||||
default = ["metrics"]
|
||||
metrics = ["reth-metrics", "dep:metrics", "reth-trie/metrics", "reth-trie-sparse/metrics"]
|
||||
debug-jitter = ["reth-engine-primitives/debug-jitter"]
|
||||
test-utils = [
|
||||
"reth-primitives-traits/test-utils",
|
||||
"reth-provider/test-utils",
|
||||
|
||||
3
crates/trie/parallel/src/jitter.rs
Normal file
3
crates/trie/parallel/src/jitter.rs
Normal file
@@ -0,0 +1,3 @@
|
||||
//! Re-export of debug jitter utilities from `reth-engine-primitives`.
|
||||
|
||||
pub(crate) use reth_engine_primitives::jitter::maybe_apply_jitter;
|
||||
@@ -35,3 +35,7 @@ pub mod metrics;
|
||||
/// Proof task manager metrics.
|
||||
#[cfg(feature = "metrics")]
|
||||
pub mod proof_task_metrics;
|
||||
|
||||
/// Debug jitter utilities for testing timing-related bugs.
|
||||
#[cfg(feature = "debug-jitter")]
|
||||
mod jitter;
|
||||
|
||||
@@ -942,6 +942,10 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
// Apply debug jitter if configured (helps trigger timing-related bugs)
|
||||
#[cfg(feature = "debug-jitter")]
|
||||
crate::jitter::maybe_apply_jitter("proof_task::storage_worker");
|
||||
|
||||
// Mark worker as available again.
|
||||
self.available_workers.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
@@ -1272,6 +1276,10 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
// Apply debug jitter if configured (helps trigger timing-related bugs)
|
||||
#[cfg(feature = "debug-jitter")]
|
||||
crate::jitter::maybe_apply_jitter("proof_task::account_worker");
|
||||
|
||||
// Mark worker as available again.
|
||||
self.available_workers.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
|
||||
@@ -244,6 +244,10 @@ impl SparseTrie for ParallelSparseTrie {
|
||||
// Reveal lower subtrie nodes in parallel
|
||||
{
|
||||
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
|
||||
use tracing::Span;
|
||||
|
||||
// Capture the current span so it can be propagated to rayon worker threads
|
||||
let parent_span = Span::current();
|
||||
|
||||
// Group the nodes by lower subtrie. This must be collected into a Vec in order for
|
||||
// rayon's `zip` to be happy.
|
||||
@@ -291,6 +295,10 @@ impl SparseTrie for ParallelSparseTrie {
|
||||
.into_par_iter()
|
||||
.zip(node_groups.into_par_iter())
|
||||
.map(|((subtrie_idx, mut subtrie), nodes)| {
|
||||
// Enter the parent span to propagate context (e.g., hashed_address for storage
|
||||
// tries) to the worker thread
|
||||
let _guard = parent_span.enter();
|
||||
|
||||
// reserve space in the HashMap ahead of time; doing it on a node-by-node basis
|
||||
// can cause multiple re-allocations as the hashmap grows.
|
||||
subtrie.nodes.reserve(nodes.len());
|
||||
@@ -326,6 +334,13 @@ impl SparseTrie for ParallelSparseTrie {
|
||||
value: Vec<u8>,
|
||||
provider: P,
|
||||
) -> SparseTrieResult<()> {
|
||||
trace!(
|
||||
target: "trie::parallel_sparse",
|
||||
?full_path,
|
||||
value_len = value.len(),
|
||||
"Updating leaf",
|
||||
);
|
||||
|
||||
// Check if the value already exists - if so, just update it (no structural changes needed)
|
||||
if self.upper_subtrie.inner.values.contains_key(&full_path) {
|
||||
self.prefix_set.insert(full_path);
|
||||
@@ -577,6 +592,12 @@ impl SparseTrie for ParallelSparseTrie {
|
||||
full_path: &Nibbles,
|
||||
provider: P,
|
||||
) -> SparseTrieResult<()> {
|
||||
trace!(
|
||||
target: "trie::parallel_sparse",
|
||||
?full_path,
|
||||
"Removing leaf",
|
||||
);
|
||||
|
||||
// When removing a leaf node it's possibly necessary to modify its parent node, and possibly
|
||||
// the parent's parent node. It is not ever necessary to descend further than that; once an
|
||||
// extension node is hit it must terminate in a branch or the root, which won't need further
|
||||
@@ -2706,6 +2727,14 @@ impl SparseSubtrie {
|
||||
return Ok(false)
|
||||
}
|
||||
|
||||
trace!(
|
||||
target: "trie::parallel_sparse",
|
||||
?path,
|
||||
?node,
|
||||
?masks,
|
||||
"Revealing node",
|
||||
);
|
||||
|
||||
match node {
|
||||
TrieNode::EmptyRoot => {
|
||||
// For an empty root, ensure that we are at the root path, and at the upper subtrie.
|
||||
@@ -3036,6 +3065,14 @@ impl SparseSubtrieInner {
|
||||
self.buffers.rlp_buf.clear();
|
||||
let rlp_node = LeafNodeRef { key, value }.rlp(&mut self.buffers.rlp_buf);
|
||||
*hash = rlp_node.as_hash();
|
||||
trace!(
|
||||
target: "trie::parallel_sparse",
|
||||
?path,
|
||||
?key,
|
||||
value = %alloy_primitives::hex::encode(value),
|
||||
?hash,
|
||||
"Calculated leaf hash",
|
||||
);
|
||||
(rlp_node, SparseNodeType::Leaf)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user