From 10c6bdb5ff464242020cea7e39ca100d7a0bcf13 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Mon, 16 Feb 2026 14:08:36 +0000 Subject: [PATCH] fix(engine): wait for persistence to complete in reth_newPayload (#22239) --- crates/engine/tree/src/tree/mod.rs | 77 +++++++++++++------ .../tree/src/tree/payload_processor/mod.rs | 22 +----- .../engine/tree/src/tree/payload_validator.rs | 6 +- 3 files changed, 59 insertions(+), 46 deletions(-) diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 43a247fc25..704717e9f8 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -42,7 +42,7 @@ use reth_tasks::spawn_os_thread; use reth_trie_db::ChangesetCache; use revm::interpreter::debug_unreachable; use state::TreeState; -use std::{fmt::Debug, ops, sync::Arc}; +use std::{fmt::Debug, ops, sync::Arc, time::Duration}; use crossbeam_channel::{Receiver, Sender}; use tokio::sync::{ @@ -1556,43 +1556,56 @@ where self.on_maybe_tree_event(maybe_event)?; } BeaconEngineMessage::RethNewPayload { payload, tx } => { - let pending_persistence = self.persistence_state.rx.take(); - let validator = &self.payload_validator; - + // Before processing the new payload, we wait for persistence and + // cache updates to complete. We do it in parallel, spawning + // persistence and cache update wait tasks with Tokio, so that we + // can get an unbiased breakdown on how long did every step take. + // + // If we first wait for persistence, and only then for cache + // updates, we will offset the cache update waits by the duration of + // persistence, which is incorrect. debug!(target: "engine::tree", "Waiting for persistence and caches in parallel before processing reth_newPayload"); - let (persistence_tx, persistence_rx) = std::sync::mpsc::channel(); - if let Some((rx, start_time, _action)) = pending_persistence { + + let pending_persistence = self.persistence_state.rx.take(); + let persistence_rx = if let Some((rx, start_time, _action)) = + pending_persistence + { + let (persistence_tx, persistence_rx) = + std::sync::mpsc::channel(); tokio::task::spawn_blocking(move || { let start = Instant::now(); - let result = rx.recv().ok(); + let result = + rx.recv().expect("persistence state channel closed"); let _ = persistence_tx.send(( result, start_time, start.elapsed(), )); }); - } + Some(persistence_rx) + } else { + None + }; - let cache_wait = validator.wait_for_caches(); - let persistence_result = persistence_rx.try_recv().ok(); + let cache_wait = self.payload_validator.wait_for_caches(); - let persistence_wait = - if let Some((result, start_time, wait_duration)) = - persistence_result - { - let _ = self - .on_persistence_complete(result.flatten(), start_time); - Some(wait_duration) - } else { - None - }; + let persistence_wait = if let Some(persistence_rx) = persistence_rx + { + let (result, start_time, wait_duration) = persistence_rx + .recv() + .expect("persistence result channel closed"); + let _ = self.on_persistence_complete(result, start_time); + Some(wait_duration) + } else { + None + }; debug!( target: "engine::tree", - persistence_wait = ?persistence_wait, + ?persistence_wait, execution_cache_wait = ?cache_wait.execution_cache, sparse_trie_wait = ?cache_wait.sparse_trie, - "Peresistence finished and caches updated for reth_newPayload" + "Persistence finished and caches updated for reth_newPayload" ); let start = Instant::now(); @@ -3123,3 +3136,23 @@ enum PersistTarget { /// Persist all blocks up to and including the canonical head. Head, } + +/// Result of waiting for caches to become available. +#[derive(Debug, Clone, Copy, Default)] +pub struct CacheWaitDurations { + /// Time spent waiting for the execution cache lock. + pub execution_cache: Duration, + /// Time spent waiting for the sparse trie lock. + pub sparse_trie: Duration, +} + +/// Trait for types that can wait for caches to become available. +/// +/// This is used by `reth_newPayload` endpoint to ensure that payload processing +/// waits for any ongoing operations to complete before starting. +pub trait WaitForCaches { + /// Waits for cache updates to complete. + /// + /// Returns the time spent waiting for each cache separately. + fn wait_for_caches(&self) -> CacheWaitDurations; +} diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index 3a4ea5fb1d..d012aa9e7a 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -8,7 +8,7 @@ use crate::tree::{ sparse_trie::StateRootComputeOutcome, }, sparse_trie::{SparseTrieCacheTask, SparseTrieTask, SpawnedSparseTrieTask}, - StateProviderBuilder, TreeConfig, + CacheWaitDurations, StateProviderBuilder, TreeConfig, WaitForCaches, }; use alloy_eip7928::BlockAccessList; use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal}; @@ -63,26 +63,6 @@ pub mod sparse_trie; use preserved_sparse_trie::{PreservedSparseTrie, SharedPreservedSparseTrie}; -/// Result of waiting for caches to become available. -#[derive(Debug, Clone, Copy, Default)] -pub struct CacheWaitDurations { - /// Time spent waiting for the execution cache lock. - pub execution_cache: Duration, - /// Time spent waiting for the sparse trie lock. - pub sparse_trie: Duration, -} - -/// Trait for types that can wait for execution cache and sparse trie locks to become available. -/// -/// This is used by `reth_newPayload` endpoint to ensure that payload processing -/// waits for any ongoing operations to complete before starting. -pub trait WaitForCaches { - /// Waits for persistence and cache updates to complete. - /// - /// Returns the time spent waiting for each cache separately. - fn wait_for_caches(&self) -> CacheWaitDurations; -} - /// Default parallelism thresholds to use with the [`ParallelSparseTrie`]. /// /// These values were determined by performing benchmarks using gradually increasing values to judge diff --git a/crates/engine/tree/src/tree/payload_validator.rs b/crates/engine/tree/src/tree/payload_validator.rs index 9d5d5e6614..388c254686 100644 --- a/crates/engine/tree/src/tree/payload_validator.rs +++ b/crates/engine/tree/src/tree/payload_validator.rs @@ -4,11 +4,11 @@ use crate::tree::{ cached_state::CachedStateProvider, error::{InsertBlockError, InsertBlockErrorKind, InsertPayloadError}, instrumented_state::InstrumentedStateProvider, - payload_processor::{CacheWaitDurations, PayloadProcessor}, + payload_processor::PayloadProcessor, precompile_cache::{CachedPrecompile, CachedPrecompileMetrics, PrecompileCacheMap}, sparse_trie::StateRootComputeOutcome, - EngineApiMetrics, EngineApiTreeState, ExecutionEnv, PayloadHandle, StateProviderBuilder, - StateProviderDatabase, TreeConfig, WaitForCaches, + CacheWaitDurations, EngineApiMetrics, EngineApiTreeState, ExecutionEnv, PayloadHandle, + StateProviderBuilder, StateProviderDatabase, TreeConfig, WaitForCaches, }; use alloy_consensus::transaction::{Either, TxHashRef}; use alloy_eip7928::BlockAccessList;