mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
feat(engine): add state root task timeout with sequential fallback (#22004)
Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
committed by
GitHub
parent
0b7cd60668
commit
5bccdc4a5d
@@ -1,6 +1,7 @@
|
||||
//! Engine tree configuration.
|
||||
|
||||
use alloy_eips::merge::EPOCH_SLOTS;
|
||||
use core::time::Duration;
|
||||
|
||||
/// Triggers persistence when the number of canonical blocks in memory exceeds this threshold.
|
||||
pub const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 2;
|
||||
@@ -64,6 +65,9 @@ pub const DEFAULT_SPARSE_TRIE_PRUNE_DEPTH: usize = 4;
|
||||
/// Storage tries beyond this limit are cleared (but allocations preserved).
|
||||
pub const DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES: usize = 100;
|
||||
|
||||
/// Default timeout for the state root task before spawning a sequential fallback.
|
||||
pub const DEFAULT_STATE_ROOT_TASK_TIMEOUT: Duration = Duration::from_secs(1);
|
||||
|
||||
const DEFAULT_BLOCK_BUFFER_LIMIT: u32 = EPOCH_SLOTS as u32 * 2;
|
||||
const DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH: u32 = 256;
|
||||
const DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE: usize = 4;
|
||||
@@ -175,6 +179,11 @@ pub struct TreeConfig {
|
||||
sparse_trie_prune_depth: usize,
|
||||
/// Maximum number of storage tries to retain after pruning.
|
||||
sparse_trie_max_storage_tries: usize,
|
||||
/// Timeout for the state root task before spawning a sequential fallback computation.
|
||||
/// If `Some`, after waiting this duration for the state root task, a sequential state root
|
||||
/// computation is spawned in parallel and whichever finishes first is used.
|
||||
/// If `None`, the timeout fallback is disabled.
|
||||
state_root_task_timeout: Option<Duration>,
|
||||
}
|
||||
|
||||
impl Default for TreeConfig {
|
||||
@@ -207,6 +216,7 @@ impl Default for TreeConfig {
|
||||
disable_trie_cache: false,
|
||||
sparse_trie_prune_depth: DEFAULT_SPARSE_TRIE_PRUNE_DEPTH,
|
||||
sparse_trie_max_storage_tries: DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES,
|
||||
state_root_task_timeout: Some(DEFAULT_STATE_ROOT_TASK_TIMEOUT),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -241,6 +251,7 @@ impl TreeConfig {
|
||||
disable_cache_metrics: bool,
|
||||
sparse_trie_prune_depth: usize,
|
||||
sparse_trie_max_storage_tries: usize,
|
||||
state_root_task_timeout: Option<Duration>,
|
||||
) -> Self {
|
||||
Self {
|
||||
persistence_threshold,
|
||||
@@ -270,6 +281,7 @@ impl TreeConfig {
|
||||
disable_trie_cache: false,
|
||||
sparse_trie_prune_depth,
|
||||
sparse_trie_max_storage_tries,
|
||||
state_root_task_timeout,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -618,4 +630,15 @@ impl TreeConfig {
|
||||
self.sparse_trie_max_storage_tries = max_tries;
|
||||
self
|
||||
}
|
||||
|
||||
/// Returns the state root task timeout.
|
||||
pub const fn state_root_task_timeout(&self) -> Option<Duration> {
|
||||
self.state_root_task_timeout
|
||||
}
|
||||
|
||||
/// Setter for state root task timeout.
|
||||
pub const fn with_state_root_task_timeout(mut self, timeout: Option<Duration>) -> Self {
|
||||
self.state_root_task_timeout = timeout;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
@@ -434,6 +434,8 @@ pub struct BlockValidationMetrics {
|
||||
pub state_root_parallel_fallback_total: Counter,
|
||||
/// Total number of times the state root task failed but the fallback succeeded.
|
||||
pub state_root_task_fallback_success_total: Counter,
|
||||
/// Total number of times the state root task timed out and a sequential fallback was spawned.
|
||||
pub state_root_task_timeout_total: Counter,
|
||||
/// Latest state root duration, ie the time spent blocked waiting for the state root.
|
||||
pub state_root_duration: Gauge,
|
||||
/// Histogram for state root duration ie the time spent blocked waiting for the state root
|
||||
|
||||
@@ -732,6 +732,18 @@ impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
|
||||
.map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
|
||||
}
|
||||
|
||||
/// Takes the state root receiver out of the handle for use with custom waiting logic
|
||||
/// (e.g., timeout-based waiting).
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// If payload processing was started without background tasks.
|
||||
pub const fn take_state_root_rx(
|
||||
&mut self,
|
||||
) -> mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>> {
|
||||
self.state_root.take().expect("state_root is None")
|
||||
}
|
||||
|
||||
/// Returns a state hook to be used to send state updates to this task.
|
||||
///
|
||||
/// If a multiproof task is spawned the hook will notify it about new states.
|
||||
|
||||
@@ -49,7 +49,7 @@ use revm_primitives::Address;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
panic::{self, AssertUnwindSafe},
|
||||
sync::Arc,
|
||||
sync::{mpsc::RecvTimeoutError, Arc},
|
||||
time::Instant,
|
||||
};
|
||||
use tracing::{debug, debug_span, error, info, instrument, trace, warn};
|
||||
@@ -520,7 +520,17 @@ where
|
||||
match strategy {
|
||||
StateRootStrategy::StateRootTask => {
|
||||
debug!(target: "engine::tree::payload_validator", "Using sparse trie state root algorithm");
|
||||
match handle.state_root() {
|
||||
|
||||
let task_result = ensure_ok_post_block!(
|
||||
self.await_state_root_with_timeout(
|
||||
&mut handle,
|
||||
overlay_factory.clone(),
|
||||
&hashed_state,
|
||||
),
|
||||
block
|
||||
);
|
||||
|
||||
match task_result {
|
||||
Ok(StateRootComputeOutcome { state_root, trie_updates }) => {
|
||||
let elapsed = root_time.elapsed();
|
||||
info!(target: "engine::tree::payload_validator", ?state_root, ?elapsed, "State root task finished");
|
||||
@@ -591,7 +601,7 @@ where
|
||||
}
|
||||
|
||||
let (root, updates) = ensure_ok_post_block!(
|
||||
self.compute_state_root_serial(overlay_factory.clone(), &hashed_state),
|
||||
Self::compute_state_root_serial(overlay_factory.clone(), &hashed_state),
|
||||
block
|
||||
);
|
||||
|
||||
@@ -889,7 +899,6 @@ where
|
||||
/// [`HashedPostState`] containing the changes of this block, to compute the state root and
|
||||
/// trie updates for this block.
|
||||
fn compute_state_root_serial(
|
||||
&self,
|
||||
overlay_factory: OverlayStateProviderFactory<P>,
|
||||
hashed_state: &HashedPostState,
|
||||
) -> ProviderResult<(B256, TrieUpdates)> {
|
||||
@@ -907,6 +916,96 @@ where
|
||||
.root_with_updates()?)
|
||||
}
|
||||
|
||||
/// Awaits the state root from the background task, with an optional timeout fallback.
|
||||
///
|
||||
/// If a timeout is configured (`state_root_task_timeout`), this method first waits for the
|
||||
/// state root task up to the timeout duration. If the task doesn't complete in time, a
|
||||
/// sequential state root computation is spawned via `spawn_blocking`. Both computations
|
||||
/// then race: the main thread polls the task receiver and the sequential result channel
|
||||
/// in a loop, returning whichever finishes first.
|
||||
///
|
||||
/// If no timeout is configured, this simply awaits the state root task without any fallback.
|
||||
///
|
||||
/// Returns `ProviderResult<Result<...>>` where the outer `ProviderResult` captures
|
||||
/// unrecoverable errors from the sequential fallback (e.g. DB errors), while the inner
|
||||
/// `Result` captures parallel state root task errors that can still fall back to serial.
|
||||
fn await_state_root_with_timeout<Tx, Err, R: Send + Sync + 'static>(
|
||||
&self,
|
||||
handle: &mut PayloadHandle<Tx, Err, R>,
|
||||
overlay_factory: OverlayStateProviderFactory<P>,
|
||||
hashed_state: &HashedPostState,
|
||||
) -> ProviderResult<Result<StateRootComputeOutcome, ParallelStateRootError>> {
|
||||
let Some(timeout) = self.config.state_root_task_timeout() else {
|
||||
return Ok(handle.state_root());
|
||||
};
|
||||
|
||||
let task_rx = handle.take_state_root_rx();
|
||||
|
||||
match task_rx.recv_timeout(timeout) {
|
||||
Ok(result) => Ok(result),
|
||||
Err(RecvTimeoutError::Disconnected) => {
|
||||
Ok(Err(ParallelStateRootError::Other("sparse trie task dropped".to_string())))
|
||||
}
|
||||
Err(RecvTimeoutError::Timeout) => {
|
||||
warn!(
|
||||
target: "engine::tree::payload_validator",
|
||||
?timeout,
|
||||
"State root task timed out, spawning sequential fallback"
|
||||
);
|
||||
self.metrics.block_validation.state_root_task_timeout_total.increment(1);
|
||||
|
||||
let (seq_tx, seq_rx) =
|
||||
std::sync::mpsc::channel::<ProviderResult<(B256, TrieUpdates)>>();
|
||||
|
||||
let seq_overlay = overlay_factory;
|
||||
let seq_hashed_state = hashed_state.clone();
|
||||
self.payload_processor.executor().spawn_blocking(move || {
|
||||
let result = Self::compute_state_root_serial(seq_overlay, &seq_hashed_state);
|
||||
let _ = seq_tx.send(result);
|
||||
});
|
||||
|
||||
const POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10);
|
||||
|
||||
loop {
|
||||
match task_rx.recv_timeout(POLL_INTERVAL) {
|
||||
Ok(result) => {
|
||||
debug!(
|
||||
target: "engine::tree::payload_validator",
|
||||
source = "task",
|
||||
"State root timeout race won"
|
||||
);
|
||||
return Ok(result);
|
||||
}
|
||||
Err(RecvTimeoutError::Disconnected) => {
|
||||
debug!(
|
||||
target: "engine::tree::payload_validator",
|
||||
"State root task dropped, waiting for sequential fallback"
|
||||
);
|
||||
let result = seq_rx.recv().map_err(|_| {
|
||||
ProviderError::other(std::io::Error::other(
|
||||
"both state root computations failed",
|
||||
))
|
||||
})?;
|
||||
let (state_root, trie_updates) = result?;
|
||||
return Ok(Ok(StateRootComputeOutcome { state_root, trie_updates }));
|
||||
}
|
||||
Err(RecvTimeoutError::Timeout) => {}
|
||||
}
|
||||
|
||||
if let Ok(result) = seq_rx.try_recv() {
|
||||
debug!(
|
||||
target: "engine::tree::payload_validator",
|
||||
source = "sequential",
|
||||
"State root timeout race won"
|
||||
);
|
||||
let (state_root, trie_updates) = result?;
|
||||
return Ok(Ok(StateRootComputeOutcome { state_root, trie_updates }));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Compares trie updates from the state root task with serial state root computation.
|
||||
///
|
||||
/// This is used for debugging and validating the correctness of the parallel state root
|
||||
@@ -921,7 +1020,7 @@ where
|
||||
) {
|
||||
debug!(target: "engine::tree::payload_validator", "Comparing trie updates with serial computation");
|
||||
|
||||
match self.compute_state_root_serial(overlay_factory.clone(), hashed_state) {
|
||||
match Self::compute_state_root_serial(overlay_factory.clone(), hashed_state) {
|
||||
Ok((serial_root, serial_trie_updates)) => {
|
||||
debug!(
|
||||
target: "engine::tree::payload_validator",
|
||||
|
||||
@@ -5,7 +5,7 @@ use reth_engine_primitives::{
|
||||
TreeConfig, DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE, DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES,
|
||||
DEFAULT_SPARSE_TRIE_PRUNE_DEPTH,
|
||||
};
|
||||
use std::sync::OnceLock;
|
||||
use std::{sync::OnceLock, time::Duration};
|
||||
|
||||
use crate::node_config::{
|
||||
DEFAULT_CROSS_BLOCK_CACHE_SIZE_MB, DEFAULT_MEMORY_BLOCK_BUFFER_TARGET,
|
||||
@@ -43,6 +43,7 @@ pub struct DefaultEngineValues {
|
||||
disable_trie_cache: bool,
|
||||
sparse_trie_prune_depth: usize,
|
||||
sparse_trie_max_storage_tries: usize,
|
||||
state_root_task_timeout: Option<String>,
|
||||
}
|
||||
|
||||
impl DefaultEngineValues {
|
||||
@@ -196,6 +197,12 @@ impl DefaultEngineValues {
|
||||
self.sparse_trie_max_storage_tries = v;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the default state root task timeout
|
||||
pub fn with_state_root_task_timeout(mut self, v: Option<String>) -> Self {
|
||||
self.state_root_task_timeout = v;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for DefaultEngineValues {
|
||||
@@ -224,6 +231,7 @@ impl Default for DefaultEngineValues {
|
||||
disable_trie_cache: false,
|
||||
sparse_trie_prune_depth: DEFAULT_SPARSE_TRIE_PRUNE_DEPTH,
|
||||
sparse_trie_max_storage_tries: DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES,
|
||||
state_root_task_timeout: Some("1s".to_string()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -363,6 +371,21 @@ pub struct EngineArgs {
|
||||
/// Maximum number of storage tries to retain after sparse trie pruning.
|
||||
#[arg(long = "engine.sparse-trie-max-storage-tries", default_value_t = DefaultEngineValues::get_global().sparse_trie_max_storage_tries)]
|
||||
pub sparse_trie_max_storage_tries: usize,
|
||||
|
||||
/// Configure the timeout for the state root task before spawning a sequential fallback.
|
||||
/// If the state root task takes longer than this, a sequential computation starts in
|
||||
/// parallel and whichever finishes first is used.
|
||||
///
|
||||
/// --engine.state-root-task-timeout 1s
|
||||
/// --engine.state-root-task-timeout 400ms
|
||||
///
|
||||
/// Set to 0s to disable.
|
||||
#[arg(
|
||||
long = "engine.state-root-task-timeout",
|
||||
value_parser = humantime::parse_duration,
|
||||
default_value = DefaultEngineValues::get_global().state_root_task_timeout.as_deref().unwrap_or("1s"),
|
||||
)]
|
||||
pub state_root_task_timeout: Option<Duration>,
|
||||
}
|
||||
|
||||
#[allow(deprecated)]
|
||||
@@ -392,6 +415,7 @@ impl Default for EngineArgs {
|
||||
disable_trie_cache,
|
||||
sparse_trie_prune_depth,
|
||||
sparse_trie_max_storage_tries,
|
||||
state_root_task_timeout,
|
||||
} = DefaultEngineValues::get_global().clone();
|
||||
Self {
|
||||
persistence_threshold,
|
||||
@@ -421,6 +445,9 @@ impl Default for EngineArgs {
|
||||
disable_trie_cache,
|
||||
sparse_trie_prune_depth,
|
||||
sparse_trie_max_storage_tries,
|
||||
state_root_task_timeout: state_root_task_timeout
|
||||
.as_deref()
|
||||
.map(|s| humantime::parse_duration(s).expect("valid default duration")),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -453,6 +480,7 @@ impl EngineArgs {
|
||||
.with_disable_trie_cache(self.disable_trie_cache)
|
||||
.with_sparse_trie_prune_depth(self.sparse_trie_prune_depth)
|
||||
.with_sparse_trie_max_storage_tries(self.sparse_trie_max_storage_tries)
|
||||
.with_state_root_task_timeout(self.state_root_task_timeout.filter(|d| !d.is_zero()))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -506,6 +534,7 @@ mod tests {
|
||||
disable_trie_cache: true,
|
||||
sparse_trie_prune_depth: 10,
|
||||
sparse_trie_max_storage_tries: 100,
|
||||
state_root_task_timeout: Some(Duration::from_secs(2)),
|
||||
};
|
||||
|
||||
let parsed_args = CommandParser::<EngineArgs>::parse_from([
|
||||
@@ -541,6 +570,8 @@ mod tests {
|
||||
"10",
|
||||
"--engine.sparse-trie-max-storage-tries",
|
||||
"100",
|
||||
"--engine.state-root-task-timeout",
|
||||
"2s",
|
||||
])
|
||||
.args;
|
||||
|
||||
|
||||
@@ -1022,6 +1022,15 @@ Engine:
|
||||
|
||||
[default: 100]
|
||||
|
||||
--engine.state-root-task-timeout <STATE_ROOT_TASK_TIMEOUT>
|
||||
Configure the timeout for the state root task before spawning a sequential fallback. If the state root task takes longer than this, a sequential computation starts in parallel and whichever finishes first is used.
|
||||
|
||||
--engine.state-root-task-timeout 1s --engine.state-root-task-timeout 400ms
|
||||
|
||||
Set to 0s to disable.
|
||||
|
||||
[default: 1s]
|
||||
|
||||
ERA:
|
||||
--era.enable
|
||||
Enable import from ERA1 files
|
||||
|
||||
Reference in New Issue
Block a user