fix(engine): wait for persistence to complete in reth_newPayload (#22239)

This commit is contained in:
Alexey Shekhirin
2026-02-16 14:08:36 +00:00
committed by GitHub
parent 20ae9ac405
commit 10c6bdb5ff
3 changed files with 59 additions and 46 deletions

View File

@@ -42,7 +42,7 @@ use reth_tasks::spawn_os_thread;
use reth_trie_db::ChangesetCache;
use revm::interpreter::debug_unreachable;
use state::TreeState;
use std::{fmt::Debug, ops, sync::Arc};
use std::{fmt::Debug, ops, sync::Arc, time::Duration};
use crossbeam_channel::{Receiver, Sender};
use tokio::sync::{
@@ -1556,43 +1556,56 @@ where
self.on_maybe_tree_event(maybe_event)?;
}
BeaconEngineMessage::RethNewPayload { payload, tx } => {
let pending_persistence = self.persistence_state.rx.take();
let validator = &self.payload_validator;
// Before processing the new payload, we wait for persistence and
// cache updates to complete. We do it in parallel, spawning
// persistence and cache update wait tasks with Tokio, so that we
// can get an unbiased breakdown on how long did every step take.
//
// If we first wait for persistence, and only then for cache
// updates, we will offset the cache update waits by the duration of
// persistence, which is incorrect.
debug!(target: "engine::tree", "Waiting for persistence and caches in parallel before processing reth_newPayload");
let (persistence_tx, persistence_rx) = std::sync::mpsc::channel();
if let Some((rx, start_time, _action)) = pending_persistence {
let pending_persistence = self.persistence_state.rx.take();
let persistence_rx = if let Some((rx, start_time, _action)) =
pending_persistence
{
let (persistence_tx, persistence_rx) =
std::sync::mpsc::channel();
tokio::task::spawn_blocking(move || {
let start = Instant::now();
let result = rx.recv().ok();
let result =
rx.recv().expect("persistence state channel closed");
let _ = persistence_tx.send((
result,
start_time,
start.elapsed(),
));
});
}
Some(persistence_rx)
} else {
None
};
let cache_wait = validator.wait_for_caches();
let persistence_result = persistence_rx.try_recv().ok();
let cache_wait = self.payload_validator.wait_for_caches();
let persistence_wait =
if let Some((result, start_time, wait_duration)) =
persistence_result
{
let _ = self
.on_persistence_complete(result.flatten(), start_time);
Some(wait_duration)
} else {
None
};
let persistence_wait = if let Some(persistence_rx) = persistence_rx
{
let (result, start_time, wait_duration) = persistence_rx
.recv()
.expect("persistence result channel closed");
let _ = self.on_persistence_complete(result, start_time);
Some(wait_duration)
} else {
None
};
debug!(
target: "engine::tree",
persistence_wait = ?persistence_wait,
?persistence_wait,
execution_cache_wait = ?cache_wait.execution_cache,
sparse_trie_wait = ?cache_wait.sparse_trie,
"Peresistence finished and caches updated for reth_newPayload"
"Persistence finished and caches updated for reth_newPayload"
);
let start = Instant::now();
@@ -3123,3 +3136,23 @@ enum PersistTarget {
/// Persist all blocks up to and including the canonical head.
Head,
}
/// Result of waiting for caches to become available.
#[derive(Debug, Clone, Copy, Default)]
pub struct CacheWaitDurations {
/// Time spent waiting for the execution cache lock.
pub execution_cache: Duration,
/// Time spent waiting for the sparse trie lock.
pub sparse_trie: Duration,
}
/// Trait for types that can wait for caches to become available.
///
/// This is used by `reth_newPayload` endpoint to ensure that payload processing
/// waits for any ongoing operations to complete before starting.
pub trait WaitForCaches {
/// Waits for cache updates to complete.
///
/// Returns the time spent waiting for each cache separately.
fn wait_for_caches(&self) -> CacheWaitDurations;
}

View File

@@ -8,7 +8,7 @@ use crate::tree::{
sparse_trie::StateRootComputeOutcome,
},
sparse_trie::{SparseTrieCacheTask, SparseTrieTask, SpawnedSparseTrieTask},
StateProviderBuilder, TreeConfig,
CacheWaitDurations, StateProviderBuilder, TreeConfig, WaitForCaches,
};
use alloy_eip7928::BlockAccessList;
use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal};
@@ -63,26 +63,6 @@ pub mod sparse_trie;
use preserved_sparse_trie::{PreservedSparseTrie, SharedPreservedSparseTrie};
/// Result of waiting for caches to become available.
#[derive(Debug, Clone, Copy, Default)]
pub struct CacheWaitDurations {
/// Time spent waiting for the execution cache lock.
pub execution_cache: Duration,
/// Time spent waiting for the sparse trie lock.
pub sparse_trie: Duration,
}
/// Trait for types that can wait for execution cache and sparse trie locks to become available.
///
/// This is used by `reth_newPayload` endpoint to ensure that payload processing
/// waits for any ongoing operations to complete before starting.
pub trait WaitForCaches {
/// Waits for persistence and cache updates to complete.
///
/// Returns the time spent waiting for each cache separately.
fn wait_for_caches(&self) -> CacheWaitDurations;
}
/// Default parallelism thresholds to use with the [`ParallelSparseTrie`].
///
/// These values were determined by performing benchmarks using gradually increasing values to judge

View File

@@ -4,11 +4,11 @@ use crate::tree::{
cached_state::CachedStateProvider,
error::{InsertBlockError, InsertBlockErrorKind, InsertPayloadError},
instrumented_state::InstrumentedStateProvider,
payload_processor::{CacheWaitDurations, PayloadProcessor},
payload_processor::PayloadProcessor,
precompile_cache::{CachedPrecompile, CachedPrecompileMetrics, PrecompileCacheMap},
sparse_trie::StateRootComputeOutcome,
EngineApiMetrics, EngineApiTreeState, ExecutionEnv, PayloadHandle, StateProviderBuilder,
StateProviderDatabase, TreeConfig, WaitForCaches,
CacheWaitDurations, EngineApiMetrics, EngineApiTreeState, ExecutionEnv, PayloadHandle,
StateProviderBuilder, StateProviderDatabase, TreeConfig, WaitForCaches,
};
use alloy_consensus::transaction::{Either, TxHashRef};
use alloy_eip7928::BlockAccessList;