Compare commits

...

9 Commits

Author SHA1 Message Date
Brian Picciano
432988c575 more tracing 2026-02-06 23:14:06 +01:00
Brian Picciano
250b253033 tracing 2026-02-06 22:42:09 +01:00
Brian Picciano
ef8bdbced0 storage trie spans 2026-02-06 21:47:21 +01:00
Brian Picciano
29548777ba Merge remote-tracking branch 'origin/main' into feat/proof-worker-debug-jitter 2026-02-06 21:29:01 +01:00
Brian Picciano
af00894f70 don't jitter execution 2026-02-06 20:58:37 +01:00
Brian Picciano
41ba573c49 feat: add debug jitter to main execution and prewarm workers
Add jitter points in:
- payload_validator: before sending tx results from main execution thread
- prewarm: before sending outcomes from prewarm workers

This helps trigger timing-related bugs in the coordination between
main execution and prewarm/proof workers.
2026-02-06 19:55:12 +01:00
Brian Picciano
e6148eedb7 refactor: move debug-jitter to reth-engine-primitives
Move the jitter utility from reth-trie-parallel to reth-engine-primitives
so it can be reused in other components like payload validation and
prewarm workers.

- Create jitter module in reth-engine-primitives
- Update reth-trie-parallel to re-export from engine-primitives
- Rename env var from RETH_PROOF_WORKER_JITTER_MS to RETH_DEBUG_JITTER_MS
- Simplify API to take a single context string
- Propagate debug-jitter feature through the crate hierarchy

Amp-Thread-ID: https://ampcode.com/threads/T-019c3416-f509-7642-94c8-0e2455861c70
Co-authored-by: Amp <amp@ampcode.com>
2026-02-06 19:53:19 +01:00
Brian Picciano
f0961d5859 Propagate to workspace 2026-02-06 19:52:30 +01:00
Georgios Konstantopoulos
77f8baea4e feat(trie): add debug jitter to proof workers
Add optional random delays (0-N ms) after proof worker jobs complete to help
trigger timing-related bugs that may only manifest in real-world conditions.

Controlled via environment variable:
- RETH_PROOF_WORKER_JITTER_MS=5 adds 0-5ms random delays

This is useful for testing race conditions or ordering bugs in the sparse trie
cache that depend on proof result timing.

Gated behind the `debug-jitter` feature flag to avoid any production overhead.

Amp-Thread-ID: https://ampcode.com/threads/T-019c33f3-83a8-7044-96e5-655fd756287f
Co-authored-by: Amp <amp@ampcode.com>
2026-02-06 19:52:30 +01:00
15 changed files with 136 additions and 0 deletions

3
Cargo.lock generated
View File

@@ -8280,6 +8280,7 @@ dependencies = [
"alloy-rpc-types-engine",
"auto_impl",
"futures",
"rand 0.9.2",
"reth-chain-state",
"reth-errors",
"reth-ethereum-primitives",
@@ -8292,6 +8293,7 @@ dependencies = [
"serde",
"thiserror 2.0.18",
"tokio",
"tracing",
]
[[package]]
@@ -10666,6 +10668,7 @@ dependencies = [
"proptest-arbitrary-interop",
"rand 0.9.2",
"rayon",
"reth-engine-primitives",
"reth-execution-errors",
"reth-metrics",
"reth-primitives-traits",

View File

@@ -107,6 +107,7 @@ js-tracer = [
"reth-rpc-eth-types/js-tracer",
]
debug-jitter = ["reth-node-builder/debug-jitter"]
dev = ["reth-ethereum-cli/dev"]
asm-keccak = [

View File

@@ -36,6 +36,8 @@ futures = { workspace = true, optional = true }
auto_impl.workspace = true
serde.workspace = true
thiserror.workspace = true
rand = { workspace = true, optional = true }
tracing = { workspace = true, optional = true }
[features]
default = ["std"]
@@ -54,3 +56,4 @@ std = [
"thiserror/std",
"reth-evm/std",
]
debug-jitter = ["dep:rand", "dep:tracing"]

View File

@@ -0,0 +1,54 @@
//! Debug jitter utilities for testing timing-related bugs.
//!
//! When the `debug-jitter` feature is enabled, various components can add
//! random delays to help trigger out-of-order timing bugs that may only
//! manifest in real-world conditions.
//!
//! Control via environment variable:
//! - `RETH_DEBUG_JITTER_MS`: Maximum jitter in milliseconds (0-N random delay)
//!
//! Example: `RETH_DEBUG_JITTER_MS=5` adds 0-5ms random delays.
use std::{sync::OnceLock, thread, time::Duration};
use rand::Rng;
use tracing::trace;
/// Cached jitter configuration from environment.
static JITTER_CONFIG: OnceLock<Option<u64>> = OnceLock::new();
/// Reads the jitter configuration from environment variables.
///
/// Returns `Some(max_ms)` if jitter is enabled, `None` otherwise.
fn get_jitter_config() -> Option<u64> {
*JITTER_CONFIG.get_or_init(|| {
std::env::var("RETH_DEBUG_JITTER_MS")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.filter(|&ms| ms > 0)
})
}
/// Applies a random jitter delay if configured.
///
/// When `RETH_DEBUG_JITTER_MS` is set to a positive value N,
/// this function sleeps for a random duration between 0 and N milliseconds.
///
/// This is useful for testing timing-sensitive code paths that may have
/// race conditions or ordering bugs that only manifest with variable latencies.
///
/// The `context` parameter is used for logging to identify where jitter was applied.
pub fn maybe_apply_jitter(context: &str) {
if let Some(max_ms) = get_jitter_config() {
let delay_ms = rand::rng().random_range(0..=max_ms);
if delay_ms > 0 {
trace!(
target: "reth::jitter",
context,
delay_ms,
"Applying debug jitter delay"
);
thread::sleep(Duration::from_millis(delay_ms));
}
}
}

View File

@@ -46,6 +46,10 @@ pub use invalid_block_hook::{InvalidBlockHook, InvalidBlockHooks, NoopInvalidBlo
pub mod config;
pub use config::*;
/// Debug jitter utilities for testing timing-related bugs.
#[cfg(feature = "debug-jitter")]
pub mod jitter;
/// This type defines the versioned types of the engine API based on the [ethereum engine API](https://github.com/ethereum/execution-apis/tree/main/src/engine).
///
/// This includes the execution payload types and payload attributes that are used to trigger a

View File

@@ -116,6 +116,7 @@ name = "state_root_task"
harness = false
[features]
debug-jitter = ["reth-engine-primitives/debug-jitter", "reth-trie-parallel/debug-jitter"]
test-utils = [
"reth-chain-state/test-utils",
"reth-chainspec/test-utils",

View File

@@ -596,6 +596,10 @@ where
multiproof_targets_from_state(res.state, v2_proofs_enabled);
metrics.prefetch_storage_targets.record(storage_targets as f64);
if let Some(to_multi_proof) = &to_multi_proof {
// Apply debug jitter before sending result (helps trigger timing-related bugs)
//#[cfg(feature = "debug-jitter")]
//reth_engine_primitives::jitter::maybe_apply_jitter("prewarm::worker_outcome"
// );
let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(targets));
}
drop(_enter);

View File

@@ -623,6 +623,7 @@ where
if new { &mut self.new_storage_updates } else { &mut self.storage_updates };
// Process all storage updates in parallel, skipping tries with no pending updates.
let span = tracing::Span::current();
let storage_results = storage_updates
.iter_mut()
.filter(|(_, updates)| !updates.is_empty())
@@ -634,6 +635,7 @@ where
})
.par_bridge_buffered()
.map(|(address, updates, mut fetched, mut trie)| {
let _enter = debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage trie leaf updates", ?address).entered();
let mut targets = Vec::new();
trie.update_leaves(updates, |path, min_len| match fetched.entry(path) {
@@ -714,6 +716,7 @@ where
return Ok(());
}
let span = tracing::Span::current();
let roots = self
.trie
.storage_tries_mut()
@@ -722,6 +725,7 @@ where
self.storage_updates.get(*address).is_some_and(|updates| updates.is_empty())
})
.map(|(address, trie)| {
let _enter = debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage root", ?address).entered();
let root =
trie.root().expect("updates are drained, trie should be revealed by now");

View File

@@ -834,6 +834,11 @@ where
let gas_used = executor.execute_transaction(tx)?;
self.metrics.record_transaction_execution(tx_start.elapsed());
// Apply debug jitter before sending result (helps trigger timing-related bugs)
//#[cfg(feature = "debug-jitter")]
//reth_engine_primitives::jitter::maybe_apply_jitter("
// payload_validator::main_execution");
let current_len = executor.receipts().len();
if current_len > last_sent_len {
last_sent_len = current_len;

View File

@@ -96,6 +96,7 @@ reth-evm-ethereum = { workspace = true, features = ["test-utils"] }
[features]
default = []
debug-jitter = ["reth-engine-tree/debug-jitter"]
js-tracer = [
"reth-rpc/js-tracer",
"reth-node-ethereum/js-tracer",

View File

@@ -14,6 +14,7 @@ workspace = true
[dependencies]
# reth
reth-primitives-traits = { workspace = true, features = ["dashmap"] }
reth-engine-primitives = { workspace = true, optional = true }
reth-execution-errors.workspace = true
reth-provider.workspace = true
reth-storage-errors.workspace = true
@@ -40,6 +41,8 @@ crossbeam-channel.workspace = true
reth-metrics = { workspace = true, optional = true }
metrics = { workspace = true, optional = true }
[dev-dependencies]
# reth
reth-primitives-traits.workspace = true
@@ -57,6 +60,7 @@ tokio = { workspace = true, features = ["rt", "rt-multi-thread", "macros"] }
[features]
default = ["metrics"]
metrics = ["reth-metrics", "dep:metrics", "reth-trie/metrics", "reth-trie-sparse/metrics"]
debug-jitter = ["reth-engine-primitives/debug-jitter"]
test-utils = [
"reth-primitives-traits/test-utils",
"reth-provider/test-utils",

View File

@@ -0,0 +1,3 @@
//! Re-export of debug jitter utilities from `reth-engine-primitives`.
pub(crate) use reth_engine_primitives::jitter::maybe_apply_jitter;

View File

@@ -35,3 +35,7 @@ pub mod metrics;
/// Proof task manager metrics.
#[cfg(feature = "metrics")]
pub mod proof_task_metrics;
/// Debug jitter utilities for testing timing-related bugs.
#[cfg(feature = "debug-jitter")]
mod jitter;

View File

@@ -942,6 +942,10 @@ where
}
}
// Apply debug jitter if configured (helps trigger timing-related bugs)
#[cfg(feature = "debug-jitter")]
crate::jitter::maybe_apply_jitter("proof_task::storage_worker");
// Mark worker as available again.
self.available_workers.fetch_add(1, Ordering::Relaxed);
@@ -1272,6 +1276,10 @@ where
}
}
// Apply debug jitter if configured (helps trigger timing-related bugs)
#[cfg(feature = "debug-jitter")]
crate::jitter::maybe_apply_jitter("proof_task::account_worker");
// Mark worker as available again.
self.available_workers.fetch_add(1, Ordering::Relaxed);

View File

@@ -244,6 +244,10 @@ impl SparseTrie for ParallelSparseTrie {
// Reveal lower subtrie nodes in parallel
{
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
use tracing::Span;
// Capture the current span so it can be propagated to rayon worker threads
let parent_span = Span::current();
// Group the nodes by lower subtrie. This must be collected into a Vec in order for
// rayon's `zip` to be happy.
@@ -291,6 +295,10 @@ impl SparseTrie for ParallelSparseTrie {
.into_par_iter()
.zip(node_groups.into_par_iter())
.map(|((subtrie_idx, mut subtrie), nodes)| {
// Enter the parent span to propagate context (e.g., hashed_address for storage
// tries) to the worker thread
let _guard = parent_span.enter();
// reserve space in the HashMap ahead of time; doing it on a node-by-node basis
// can cause multiple re-allocations as the hashmap grows.
subtrie.nodes.reserve(nodes.len());
@@ -326,6 +334,13 @@ impl SparseTrie for ParallelSparseTrie {
value: Vec<u8>,
provider: P,
) -> SparseTrieResult<()> {
trace!(
target: "trie::parallel_sparse",
?full_path,
value_len = value.len(),
"Updating leaf",
);
// Check if the value already exists - if so, just update it (no structural changes needed)
if self.upper_subtrie.inner.values.contains_key(&full_path) {
self.prefix_set.insert(full_path);
@@ -577,6 +592,12 @@ impl SparseTrie for ParallelSparseTrie {
full_path: &Nibbles,
provider: P,
) -> SparseTrieResult<()> {
trace!(
target: "trie::parallel_sparse",
?full_path,
"Removing leaf",
);
// When removing a leaf node it's possibly necessary to modify its parent node, and possibly
// the parent's parent node. It is not ever necessary to descend further than that; once an
// extension node is hit it must terminate in a branch or the root, which won't need further
@@ -2706,6 +2727,14 @@ impl SparseSubtrie {
return Ok(false)
}
trace!(
target: "trie::parallel_sparse",
?path,
?node,
?masks,
"Revealing node",
);
match node {
TrieNode::EmptyRoot => {
// For an empty root, ensure that we are at the root path, and at the upper subtrie.
@@ -3036,6 +3065,14 @@ impl SparseSubtrieInner {
self.buffers.rlp_buf.clear();
let rlp_node = LeafNodeRef { key, value }.rlp(&mut self.buffers.rlp_buf);
*hash = rlp_node.as_hash();
trace!(
target: "trie::parallel_sparse",
?path,
?key,
value = %alloy_primitives::hex::encode(value),
?hash,
"Calculated leaf hash",
);
(rlp_node, SparseNodeType::Leaf)
}
}