From 5bccdc4a5d9d0ac9ab2de699554e4690dd9b8244 Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Wed, 11 Feb 2026 15:45:45 -0500 Subject: [PATCH] feat(engine): add state root task timeout with sequential fallback (#22004) Co-authored-by: Amp --- crates/engine/primitives/src/config.rs | 23 ++++ crates/engine/tree/src/tree/metrics.rs | 2 + .../tree/src/tree/payload_processor/mod.rs | 12 ++ .../engine/tree/src/tree/payload_validator.rs | 109 +++++++++++++++++- crates/node/core/src/args/engine.rs | 33 +++++- docs/vocs/docs/pages/cli/reth/node.mdx | 9 ++ 6 files changed, 182 insertions(+), 6 deletions(-) diff --git a/crates/engine/primitives/src/config.rs b/crates/engine/primitives/src/config.rs index bfdd61782d..2a57fda6f9 100644 --- a/crates/engine/primitives/src/config.rs +++ b/crates/engine/primitives/src/config.rs @@ -1,6 +1,7 @@ //! Engine tree configuration. use alloy_eips::merge::EPOCH_SLOTS; +use core::time::Duration; /// Triggers persistence when the number of canonical blocks in memory exceeds this threshold. pub const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 2; @@ -64,6 +65,9 @@ pub const DEFAULT_SPARSE_TRIE_PRUNE_DEPTH: usize = 4; /// Storage tries beyond this limit are cleared (but allocations preserved). pub const DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES: usize = 100; +/// Default timeout for the state root task before spawning a sequential fallback. +pub const DEFAULT_STATE_ROOT_TASK_TIMEOUT: Duration = Duration::from_secs(1); + const DEFAULT_BLOCK_BUFFER_LIMIT: u32 = EPOCH_SLOTS as u32 * 2; const DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH: u32 = 256; const DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE: usize = 4; @@ -175,6 +179,11 @@ pub struct TreeConfig { sparse_trie_prune_depth: usize, /// Maximum number of storage tries to retain after pruning. sparse_trie_max_storage_tries: usize, + /// Timeout for the state root task before spawning a sequential fallback computation. + /// If `Some`, after waiting this duration for the state root task, a sequential state root + /// computation is spawned in parallel and whichever finishes first is used. + /// If `None`, the timeout fallback is disabled. + state_root_task_timeout: Option, } impl Default for TreeConfig { @@ -207,6 +216,7 @@ impl Default for TreeConfig { disable_trie_cache: false, sparse_trie_prune_depth: DEFAULT_SPARSE_TRIE_PRUNE_DEPTH, sparse_trie_max_storage_tries: DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES, + state_root_task_timeout: Some(DEFAULT_STATE_ROOT_TASK_TIMEOUT), } } } @@ -241,6 +251,7 @@ impl TreeConfig { disable_cache_metrics: bool, sparse_trie_prune_depth: usize, sparse_trie_max_storage_tries: usize, + state_root_task_timeout: Option, ) -> Self { Self { persistence_threshold, @@ -270,6 +281,7 @@ impl TreeConfig { disable_trie_cache: false, sparse_trie_prune_depth, sparse_trie_max_storage_tries, + state_root_task_timeout, } } @@ -618,4 +630,15 @@ impl TreeConfig { self.sparse_trie_max_storage_tries = max_tries; self } + + /// Returns the state root task timeout. + pub const fn state_root_task_timeout(&self) -> Option { + self.state_root_task_timeout + } + + /// Setter for state root task timeout. + pub const fn with_state_root_task_timeout(mut self, timeout: Option) -> Self { + self.state_root_task_timeout = timeout; + self + } } diff --git a/crates/engine/tree/src/tree/metrics.rs b/crates/engine/tree/src/tree/metrics.rs index cd2a8bce5c..e6f9a17833 100644 --- a/crates/engine/tree/src/tree/metrics.rs +++ b/crates/engine/tree/src/tree/metrics.rs @@ -434,6 +434,8 @@ pub struct BlockValidationMetrics { pub state_root_parallel_fallback_total: Counter, /// Total number of times the state root task failed but the fallback succeeded. pub state_root_task_fallback_success_total: Counter, + /// Total number of times the state root task timed out and a sequential fallback was spawned. + pub state_root_task_timeout_total: Counter, /// Latest state root duration, ie the time spent blocked waiting for the state root. pub state_root_duration: Gauge, /// Histogram for state root duration ie the time spent blocked waiting for the state root diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index 84eee2a0a1..9306288b5c 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -732,6 +732,18 @@ impl PayloadHandle { .map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))? } + /// Takes the state root receiver out of the handle for use with custom waiting logic + /// (e.g., timeout-based waiting). + /// + /// # Panics + /// + /// If payload processing was started without background tasks. + pub const fn take_state_root_rx( + &mut self, + ) -> mpsc::Receiver> { + self.state_root.take().expect("state_root is None") + } + /// Returns a state hook to be used to send state updates to this task. /// /// If a multiproof task is spawned the hook will notify it about new states. diff --git a/crates/engine/tree/src/tree/payload_validator.rs b/crates/engine/tree/src/tree/payload_validator.rs index 11b2d20c56..33956591af 100644 --- a/crates/engine/tree/src/tree/payload_validator.rs +++ b/crates/engine/tree/src/tree/payload_validator.rs @@ -49,7 +49,7 @@ use revm_primitives::Address; use std::{ collections::HashMap, panic::{self, AssertUnwindSafe}, - sync::Arc, + sync::{mpsc::RecvTimeoutError, Arc}, time::Instant, }; use tracing::{debug, debug_span, error, info, instrument, trace, warn}; @@ -520,7 +520,17 @@ where match strategy { StateRootStrategy::StateRootTask => { debug!(target: "engine::tree::payload_validator", "Using sparse trie state root algorithm"); - match handle.state_root() { + + let task_result = ensure_ok_post_block!( + self.await_state_root_with_timeout( + &mut handle, + overlay_factory.clone(), + &hashed_state, + ), + block + ); + + match task_result { Ok(StateRootComputeOutcome { state_root, trie_updates }) => { let elapsed = root_time.elapsed(); info!(target: "engine::tree::payload_validator", ?state_root, ?elapsed, "State root task finished"); @@ -591,7 +601,7 @@ where } let (root, updates) = ensure_ok_post_block!( - self.compute_state_root_serial(overlay_factory.clone(), &hashed_state), + Self::compute_state_root_serial(overlay_factory.clone(), &hashed_state), block ); @@ -889,7 +899,6 @@ where /// [`HashedPostState`] containing the changes of this block, to compute the state root and /// trie updates for this block. fn compute_state_root_serial( - &self, overlay_factory: OverlayStateProviderFactory

, hashed_state: &HashedPostState, ) -> ProviderResult<(B256, TrieUpdates)> { @@ -907,6 +916,96 @@ where .root_with_updates()?) } + /// Awaits the state root from the background task, with an optional timeout fallback. + /// + /// If a timeout is configured (`state_root_task_timeout`), this method first waits for the + /// state root task up to the timeout duration. If the task doesn't complete in time, a + /// sequential state root computation is spawned via `spawn_blocking`. Both computations + /// then race: the main thread polls the task receiver and the sequential result channel + /// in a loop, returning whichever finishes first. + /// + /// If no timeout is configured, this simply awaits the state root task without any fallback. + /// + /// Returns `ProviderResult>` where the outer `ProviderResult` captures + /// unrecoverable errors from the sequential fallback (e.g. DB errors), while the inner + /// `Result` captures parallel state root task errors that can still fall back to serial. + fn await_state_root_with_timeout( + &self, + handle: &mut PayloadHandle, + overlay_factory: OverlayStateProviderFactory

, + hashed_state: &HashedPostState, + ) -> ProviderResult> { + let Some(timeout) = self.config.state_root_task_timeout() else { + return Ok(handle.state_root()); + }; + + let task_rx = handle.take_state_root_rx(); + + match task_rx.recv_timeout(timeout) { + Ok(result) => Ok(result), + Err(RecvTimeoutError::Disconnected) => { + Ok(Err(ParallelStateRootError::Other("sparse trie task dropped".to_string()))) + } + Err(RecvTimeoutError::Timeout) => { + warn!( + target: "engine::tree::payload_validator", + ?timeout, + "State root task timed out, spawning sequential fallback" + ); + self.metrics.block_validation.state_root_task_timeout_total.increment(1); + + let (seq_tx, seq_rx) = + std::sync::mpsc::channel::>(); + + let seq_overlay = overlay_factory; + let seq_hashed_state = hashed_state.clone(); + self.payload_processor.executor().spawn_blocking(move || { + let result = Self::compute_state_root_serial(seq_overlay, &seq_hashed_state); + let _ = seq_tx.send(result); + }); + + const POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10); + + loop { + match task_rx.recv_timeout(POLL_INTERVAL) { + Ok(result) => { + debug!( + target: "engine::tree::payload_validator", + source = "task", + "State root timeout race won" + ); + return Ok(result); + } + Err(RecvTimeoutError::Disconnected) => { + debug!( + target: "engine::tree::payload_validator", + "State root task dropped, waiting for sequential fallback" + ); + let result = seq_rx.recv().map_err(|_| { + ProviderError::other(std::io::Error::other( + "both state root computations failed", + )) + })?; + let (state_root, trie_updates) = result?; + return Ok(Ok(StateRootComputeOutcome { state_root, trie_updates })); + } + Err(RecvTimeoutError::Timeout) => {} + } + + if let Ok(result) = seq_rx.try_recv() { + debug!( + target: "engine::tree::payload_validator", + source = "sequential", + "State root timeout race won" + ); + let (state_root, trie_updates) = result?; + return Ok(Ok(StateRootComputeOutcome { state_root, trie_updates })); + } + } + } + } + } + /// Compares trie updates from the state root task with serial state root computation. /// /// This is used for debugging and validating the correctness of the parallel state root @@ -921,7 +1020,7 @@ where ) { debug!(target: "engine::tree::payload_validator", "Comparing trie updates with serial computation"); - match self.compute_state_root_serial(overlay_factory.clone(), hashed_state) { + match Self::compute_state_root_serial(overlay_factory.clone(), hashed_state) { Ok((serial_root, serial_trie_updates)) => { debug!( target: "engine::tree::payload_validator", diff --git a/crates/node/core/src/args/engine.rs b/crates/node/core/src/args/engine.rs index daa0b45b50..bd7db697ff 100644 --- a/crates/node/core/src/args/engine.rs +++ b/crates/node/core/src/args/engine.rs @@ -5,7 +5,7 @@ use reth_engine_primitives::{ TreeConfig, DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE, DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES, DEFAULT_SPARSE_TRIE_PRUNE_DEPTH, }; -use std::sync::OnceLock; +use std::{sync::OnceLock, time::Duration}; use crate::node_config::{ DEFAULT_CROSS_BLOCK_CACHE_SIZE_MB, DEFAULT_MEMORY_BLOCK_BUFFER_TARGET, @@ -43,6 +43,7 @@ pub struct DefaultEngineValues { disable_trie_cache: bool, sparse_trie_prune_depth: usize, sparse_trie_max_storage_tries: usize, + state_root_task_timeout: Option, } impl DefaultEngineValues { @@ -196,6 +197,12 @@ impl DefaultEngineValues { self.sparse_trie_max_storage_tries = v; self } + + /// Set the default state root task timeout + pub fn with_state_root_task_timeout(mut self, v: Option) -> Self { + self.state_root_task_timeout = v; + self + } } impl Default for DefaultEngineValues { @@ -224,6 +231,7 @@ impl Default for DefaultEngineValues { disable_trie_cache: false, sparse_trie_prune_depth: DEFAULT_SPARSE_TRIE_PRUNE_DEPTH, sparse_trie_max_storage_tries: DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES, + state_root_task_timeout: Some("1s".to_string()), } } } @@ -363,6 +371,21 @@ pub struct EngineArgs { /// Maximum number of storage tries to retain after sparse trie pruning. #[arg(long = "engine.sparse-trie-max-storage-tries", default_value_t = DefaultEngineValues::get_global().sparse_trie_max_storage_tries)] pub sparse_trie_max_storage_tries: usize, + + /// Configure the timeout for the state root task before spawning a sequential fallback. + /// If the state root task takes longer than this, a sequential computation starts in + /// parallel and whichever finishes first is used. + /// + /// --engine.state-root-task-timeout 1s + /// --engine.state-root-task-timeout 400ms + /// + /// Set to 0s to disable. + #[arg( + long = "engine.state-root-task-timeout", + value_parser = humantime::parse_duration, + default_value = DefaultEngineValues::get_global().state_root_task_timeout.as_deref().unwrap_or("1s"), + )] + pub state_root_task_timeout: Option, } #[allow(deprecated)] @@ -392,6 +415,7 @@ impl Default for EngineArgs { disable_trie_cache, sparse_trie_prune_depth, sparse_trie_max_storage_tries, + state_root_task_timeout, } = DefaultEngineValues::get_global().clone(); Self { persistence_threshold, @@ -421,6 +445,9 @@ impl Default for EngineArgs { disable_trie_cache, sparse_trie_prune_depth, sparse_trie_max_storage_tries, + state_root_task_timeout: state_root_task_timeout + .as_deref() + .map(|s| humantime::parse_duration(s).expect("valid default duration")), } } } @@ -453,6 +480,7 @@ impl EngineArgs { .with_disable_trie_cache(self.disable_trie_cache) .with_sparse_trie_prune_depth(self.sparse_trie_prune_depth) .with_sparse_trie_max_storage_tries(self.sparse_trie_max_storage_tries) + .with_state_root_task_timeout(self.state_root_task_timeout.filter(|d| !d.is_zero())) } } @@ -506,6 +534,7 @@ mod tests { disable_trie_cache: true, sparse_trie_prune_depth: 10, sparse_trie_max_storage_tries: 100, + state_root_task_timeout: Some(Duration::from_secs(2)), }; let parsed_args = CommandParser::::parse_from([ @@ -541,6 +570,8 @@ mod tests { "10", "--engine.sparse-trie-max-storage-tries", "100", + "--engine.state-root-task-timeout", + "2s", ]) .args; diff --git a/docs/vocs/docs/pages/cli/reth/node.mdx b/docs/vocs/docs/pages/cli/reth/node.mdx index 9877d934f5..702a1023e6 100644 --- a/docs/vocs/docs/pages/cli/reth/node.mdx +++ b/docs/vocs/docs/pages/cli/reth/node.mdx @@ -1022,6 +1022,15 @@ Engine: [default: 100] + --engine.state-root-task-timeout + Configure the timeout for the state root task before spawning a sequential fallback. If the state root task takes longer than this, a sequential computation starts in parallel and whichever finishes first is used. + + --engine.state-root-task-timeout 1s --engine.state-root-task-timeout 400ms + + Set to 0s to disable. + + [default: 1s] + ERA: --era.enable Enable import from ERA1 files