Compare commits

...

5 Commits

Author SHA1 Message Date
Sergei Shulepov
5732c3ce5f engine: fix rebase CI fallout 2026-03-26 17:06:52 +00:00
Sergei Shulepov
32c55d3c48 tidy up 2026-03-26 17:03:02 +00:00
Sergei Shulepov
624a1eb428 engine-primitives: make forkchoice test deterministic 2026-03-26 17:03:02 +00:00
Sergei Shulepov
8fbcf7f78b engine: fix backpressure stall handling 2026-03-26 17:03:02 +00:00
Sergei Shulepov
b37a77f945 engine: add persistence backpressure for beacon requests 2026-03-26 17:03:02 +00:00
13 changed files with 715 additions and 170 deletions

View File

@@ -216,6 +216,7 @@ impl Command {
let new_payload_result = NewPayloadResult {
gas_used,
latency: np_latency,
backpressure_wait: server_timings.as_ref().and_then(|t| t.backpressure_wait),
persistence_wait: server_timings.as_ref().and_then(|t| t.persistence_wait),
execution_cache_wait: server_timings
.as_ref()

View File

@@ -142,6 +142,7 @@ impl Command {
let new_payload_result = NewPayloadResult {
gas_used,
latency,
backpressure_wait: server_timings.as_ref().and_then(|t| t.backpressure_wait),
persistence_wait: server_timings.as_ref().and_then(|t| t.persistence_wait),
execution_cache_wait: server_timings
.as_ref()

View File

@@ -25,6 +25,8 @@ pub(crate) struct NewPayloadResult {
pub(crate) gas_used: u64,
/// The latency of the `newPayload` call.
pub(crate) latency: Duration,
/// Time spent waiting in the tree backpressure queue. `None` when the message was not queued.
pub(crate) backpressure_wait: Option<Duration>,
/// Time spent waiting for persistence. `None` when no persistence was in-flight.
pub(crate) persistence_wait: Option<Duration>,
/// Time spent waiting for execution cache lock.
@@ -61,9 +63,11 @@ impl Serialize for NewPayloadResult {
{
// convert the time to microseconds
let time = self.latency.as_micros();
let mut state = serializer.serialize_struct("NewPayloadResult", 5)?;
let mut state = serializer.serialize_struct("NewPayloadResult", 6)?;
state.serialize_field("gas_used", &self.gas_used)?;
state.serialize_field("latency", &time)?;
state
.serialize_field("backpressure_wait", &self.backpressure_wait.map(|d| d.as_micros()))?;
state.serialize_field("persistence_wait", &self.persistence_wait.map(|d| d.as_micros()))?;
state.serialize_field("execution_cache_wait", &self.execution_cache_wait.as_micros())?;
state.serialize_field("sparse_trie_wait", &self.sparse_trie_wait.as_micros())?;
@@ -119,6 +123,9 @@ impl std::fmt::Display for CombinedResult {
if let Some(d) = np.persistence_wait {
write!(f, ", persistence wait: {d:?}")?;
}
if let Some(d) = np.backpressure_wait {
write!(f, ", backpressure wait: {d:?}")?;
}
Ok(())
}
}
@@ -134,7 +141,7 @@ impl Serialize for CombinedResult {
let fcu_latency = self.fcu_latency.as_micros();
let new_payload_latency = self.new_payload_result.latency.as_micros();
let total_latency = self.total_latency.as_micros();
let mut state = serializer.serialize_struct("CombinedResult", 10)?;
let mut state = serializer.serialize_struct("CombinedResult", 11)?;
// flatten the new payload result because this is meant for CSV writing
state.serialize_field("block_number", &self.block_number)?;
@@ -144,6 +151,10 @@ impl Serialize for CombinedResult {
state.serialize_field("new_payload_latency", &new_payload_latency)?;
state.serialize_field("fcu_latency", &fcu_latency)?;
state.serialize_field("total_latency", &total_latency)?;
state.serialize_field(
"backpressure_wait",
&self.new_payload_result.backpressure_wait.map(|d| d.as_micros()),
)?;
state.serialize_field(
"persistence_wait",
&self.new_payload_result.persistence_wait.map(|d| d.as_micros()),
@@ -315,4 +326,37 @@ mod tests {
let second_line = result.next().unwrap().unwrap();
assert_eq!(second_line, expected_second_line);
}
#[test]
fn test_write_combined_result_csv_includes_backpressure_wait() {
let result = CombinedResult {
block_number: 1,
gas_limit: 30_000_000,
transaction_count: 10,
new_payload_result: NewPayloadResult {
gas_used: 1_000,
latency: Duration::from_micros(2_000),
backpressure_wait: Some(Duration::from_micros(300)),
persistence_wait: Some(Duration::from_micros(400)),
execution_cache_wait: Duration::from_micros(500),
sparse_trie_wait: Duration::from_micros(600),
},
fcu_latency: Duration::from_micros(700),
total_latency: Duration::from_micros(2_700),
};
let mut writer = Writer::from_writer(vec![]);
writer.serialize(result).unwrap();
let result = writer.into_inner().unwrap();
let mut result = result.as_slice().lines();
let expected_first_line = "block_number,gas_limit,transaction_count,gas_used,new_payload_latency,fcu_latency,total_latency,backpressure_wait,persistence_wait,execution_cache_wait,sparse_trie_wait";
let first_line = result.next().unwrap().unwrap();
assert_eq!(first_line, expected_first_line);
let expected_second_line = "1,30000000,10,1000,2000,700,2700,300,400,500,600";
let second_line = result.next().unwrap().unwrap();
assert_eq!(second_line, expected_second_line);
}
}

View File

@@ -264,6 +264,7 @@ impl Command {
let new_payload_result = NewPayloadResult {
gas_used,
latency: np_latency,
backpressure_wait: server_timings.as_ref().and_then(|t| t.backpressure_wait),
persistence_wait: server_timings.as_ref().and_then(|t| t.persistence_wait),
execution_cache_wait: server_timings
.as_ref()

View File

@@ -313,6 +313,8 @@ pub(crate) async fn call_new_payload<N: Network, P: Provider<N>>(
struct RethPayloadStatus {
latency_us: u64,
#[serde(default)]
backpressure_wait_us: Option<u64>,
#[serde(default)]
persistence_wait_us: Option<u64>,
#[serde(default)]
execution_cache_wait_us: u64,
@@ -325,6 +327,8 @@ struct RethPayloadStatus {
pub(crate) struct NewPayloadTimingBreakdown {
/// Server-side execution latency.
pub(crate) latency: Duration,
/// Time spent waiting in the backpressure queue. `None` when the message was not queued.
pub(crate) backpressure_wait: Option<Duration>,
/// Time spent waiting for persistence. `None` when no persistence was in-flight.
pub(crate) persistence_wait: Option<Duration>,
/// Time spent waiting for execution cache lock.
@@ -374,6 +378,7 @@ pub(crate) async fn call_new_payload_with_reth<N: Network, P: Provider<N>>(
Ok(Some(NewPayloadTimingBreakdown {
latency: Duration::from_micros(resp.latency_us),
backpressure_wait: resp.backpressure_wait_us.map(Duration::from_micros),
persistence_wait: resp.persistence_wait_us.map(Duration::from_micros),
execution_cache_wait: Duration::from_micros(resp.execution_cache_wait_us),
sparse_trie_wait: Duration::from_micros(resp.sparse_trie_wait_us),

View File

@@ -6,6 +6,9 @@ use core::time::Duration;
/// Triggers persistence when the number of canonical blocks in memory exceeds this threshold.
pub const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 2;
/// Maximum canonical-minus-persisted gap before engine API processing is stalled.
pub const DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD: u64 = 16;
/// How close to the canonical head we persist blocks.
pub const DEFAULT_MEMORY_BLOCK_BUFFER_TARGET: u64 = 0;
@@ -44,6 +47,16 @@ const DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH: u32 = 256;
const DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE: usize = 4;
const DEFAULT_CROSS_BLOCK_CACHE_SIZE: usize = default_cross_block_cache_size();
const fn assert_backpressure_threshold_invariant(
persistence_threshold: u64,
persistence_backpressure_threshold: u64,
) {
debug_assert!(
persistence_backpressure_threshold > persistence_threshold,
"persistence_backpressure_threshold must be greater than persistence_threshold",
);
}
const fn default_cross_block_cache_size() -> usize {
if cfg!(test) {
1024 * 1024 // 1 MB in tests
@@ -82,6 +95,8 @@ pub struct TreeConfig {
///
/// Note: this should be less than or equal to `persistence_threshold`.
memory_block_buffer_target: u64,
/// Maximum canonical-minus-persisted gap before engine API processing is stalled.
persistence_backpressure_threshold: u64,
/// Number of pending blocks that cannot be executed due to missing parent and
/// are kept in cache.
block_buffer_limit: u32,
@@ -162,9 +177,14 @@ pub struct TreeConfig {
impl Default for TreeConfig {
fn default() -> Self {
assert_backpressure_threshold_invariant(
DEFAULT_PERSISTENCE_THRESHOLD,
DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD,
);
Self {
persistence_threshold: DEFAULT_PERSISTENCE_THRESHOLD,
memory_block_buffer_target: DEFAULT_MEMORY_BLOCK_BUFFER_TARGET,
persistence_backpressure_threshold: DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD,
block_buffer_limit: DEFAULT_BLOCK_BUFFER_LIMIT,
max_invalid_header_cache_length: DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH,
max_execute_block_batch_size: DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE,
@@ -201,6 +221,7 @@ impl TreeConfig {
pub const fn new(
persistence_threshold: u64,
memory_block_buffer_target: u64,
persistence_backpressure_threshold: u64,
block_buffer_limit: u32,
max_invalid_header_cache_length: u32,
max_execute_block_batch_size: usize,
@@ -225,9 +246,14 @@ impl TreeConfig {
state_root_task_timeout: Option<Duration>,
share_execution_cache_with_payload_builder: bool,
) -> Self {
assert_backpressure_threshold_invariant(
persistence_threshold,
persistence_backpressure_threshold,
);
Self {
persistence_threshold,
memory_block_buffer_target,
persistence_backpressure_threshold,
block_buffer_limit,
max_invalid_header_cache_length,
max_execute_block_batch_size,
@@ -267,6 +293,11 @@ impl TreeConfig {
self.memory_block_buffer_target
}
/// Return the persistence backpressure threshold.
pub const fn persistence_backpressure_threshold(&self) -> u64 {
self.persistence_backpressure_threshold
}
/// Return the block buffer limit.
pub const fn block_buffer_limit(&self) -> u32 {
self.block_buffer_limit
@@ -363,6 +394,10 @@ impl TreeConfig {
/// Setter for persistence threshold.
pub const fn with_persistence_threshold(mut self, persistence_threshold: u64) -> Self {
self.persistence_threshold = persistence_threshold;
assert_backpressure_threshold_invariant(
self.persistence_threshold,
self.persistence_backpressure_threshold,
);
self
}
@@ -375,6 +410,19 @@ impl TreeConfig {
self
}
/// Setter for persistence backpressure threshold.
pub const fn with_persistence_backpressure_threshold(
mut self,
persistence_backpressure_threshold: u64,
) -> Self {
self.persistence_backpressure_threshold = persistence_backpressure_threshold;
assert_backpressure_threshold_invariant(
self.persistence_threshold,
self.persistence_backpressure_threshold,
);
self
}
/// Setter for block buffer limit.
pub const fn with_block_buffer_limit(mut self, block_buffer_limit: u32) -> Self {
self.block_buffer_limit = block_buffer_limit;
@@ -592,3 +640,18 @@ impl TreeConfig {
self
}
}
#[cfg(test)]
mod tests {
use super::TreeConfig;
#[test]
#[should_panic(
expected = "persistence_backpressure_threshold must be greater than persistence_threshold"
)]
fn rejects_backpressure_threshold_at_or_below_persistence_threshold() {
let _ = TreeConfig::default()
.with_persistence_threshold(4)
.with_persistence_backpressure_threshold(4);
}
}

View File

@@ -402,10 +402,10 @@ mod tests {
#[test]
fn test_forkchoice_state_hash_find() {
// Define example hashes
let head_hash = B256::random();
let safe_hash = B256::random();
let finalized_hash = B256::random();
let non_matching_hash = B256::random();
let head_hash = B256::with_last_byte(1);
let safe_hash = B256::with_last_byte(2);
let finalized_hash = B256::with_last_byte(3);
let non_matching_hash = B256::with_last_byte(4);
// Create a ForkchoiceState with specific hashes
let state = ForkchoiceState {

View File

@@ -148,6 +148,10 @@ impl Future for PendingPayloadId {
pub struct NewPayloadTimings {
/// Server-side execution latency.
pub latency: Duration,
/// Time spent waiting in the backpressure queue before processing.
///
/// `None` when the message was not queued.
pub backpressure_wait: Option<Duration>,
/// Time spent waiting for persistence to complete.
///
/// `None` when wasn't asked to wait for persistence.

View File

@@ -171,6 +171,12 @@ pub struct EngineMetrics {
pub(crate) executed_new_block_cache_miss: Counter,
/// Histogram of persistence operation durations (in seconds)
pub(crate) persistence_duration: Histogram,
/// Histogram of time newPayload-style requests spend buffered in the backpressure queue.
pub(crate) new_payload_backpressure_wait_seconds: Histogram,
/// Histogram of time forkchoiceUpdated requests spend buffered in the backpressure queue.
pub(crate) fcu_backpressure_wait_seconds: Histogram,
/// Current number of buffered beacon messages in the tree-local queue.
pub(crate) backpressure_buffer_len: Gauge,
/// Tracks the how often we failed to deliver a newPayload response.
///
/// This effectively tracks how often the message sender dropped the channel and indicates a CL

View File

@@ -40,7 +40,13 @@ use reth_tasks::{spawn_os_thread, utils::increase_thread_priority};
use reth_trie_db::ChangesetCache;
use revm::interpreter::debug_unreachable;
use state::TreeState;
use std::{collections::HashMap, fmt::Debug, ops, sync::Arc, time::Duration};
use std::{
collections::{HashMap, VecDeque},
fmt::Debug,
ops,
sync::Arc,
time::Duration,
};
use crossbeam_channel::{Receiver, Sender};
use tokio::sync::{
@@ -250,6 +256,12 @@ pub enum TreeAction {
},
}
#[derive(Debug)]
enum StashedEngineMessage<T: PayloadTypes, N: NodePrimitives> {
Beacon { stashed_at: Instant, message: BeaconEngineMessage<T> },
Other(FromEngine<EngineApiRequest<T, N>, N::Block>),
}
/// The engine API tree handler implementation.
///
/// This type is responsible for processing engine API requests, maintaining the canonical state and
@@ -276,6 +288,8 @@ where
incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
/// Incoming engine API requests.
incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
/// Stashed engine messages waiting to be processed once persistence catches up.
stashed_engine_messages: VecDeque<StashedEngineMessage<T, N>>,
/// Outgoing events that are emitted to the handler.
outgoing: UnboundedSender<EngineApiEvent<N>>,
/// Channels to the persistence layer.
@@ -321,6 +335,7 @@ where
.field("payload_validator", &self.payload_validator)
.field("state", &self.state)
.field("incoming_tx", &self.incoming_tx)
.field("stashed_engine_messages", &self.stashed_engine_messages.len())
.field("persistence", &self.persistence)
.field("persistence_state", &self.persistence_state)
.field("backfill_sync_state", &self.backfill_sync_state)
@@ -381,6 +396,7 @@ where
consensus,
payload_validator,
incoming,
stashed_engine_messages: VecDeque::new(),
outgoing,
persistence,
persistence_state,
@@ -472,12 +488,169 @@ where
self.incoming_tx.clone()
}
fn update_backpressure_stash_len_metric(&self) {
self.metrics.engine.backpressure_buffer_len.set(self.stashed_engine_messages.len() as f64);
}
/// Stashes an incoming engine message without processing it. Called from
/// `wait_for_persistence_event` to collect messages that arrive while we are blocked
/// waiting for persistence to complete.
fn stash_incoming_message(
&mut self,
message: FromEngine<EngineApiRequest<T, N>, N::Block>,
) -> Result<(), InsertBlockFatalError> {
match message {
FromEngine::Request(EngineApiRequest::Beacon(request)) => {
self.stash_beacon_message(request);
}
other => self.stash_engine_message(other),
}
Ok(())
}
/// Stashes a beacon message (newPayload / forkchoiceUpdated). Stamps it with `stashed_at`
/// so we can measure how long it waited once it is eventually processed.
fn stash_beacon_message(&mut self, message: BeaconEngineMessage<T>) {
self.stashed_engine_messages
.push_back(StashedEngineMessage::Beacon { stashed_at: Instant::now(), message });
self.update_backpressure_stash_len_metric();
}
/// Stashes a non-beacon engine message (e.g. downloaded blocks). These don't carry a
/// timestamp because we don't track backpressure wait times for non-beacon messages.
fn stash_engine_message(&mut self, message: FromEngine<EngineApiRequest<T, N>, N::Block>) {
self.stashed_engine_messages.push_back(StashedEngineMessage::Other(message));
self.update_backpressure_stash_len_metric();
}
/// How many blocks the canonical tip is ahead of the last persisted block. A large gap means
/// persistence is falling behind execution.
const fn persistence_gap(&self) -> u64 {
self.state
.tree_state
.canonical_block_number()
.saturating_sub(self.persistence_state.last_persisted_block.number)
}
/// Returns `true` when the main loop should stall processing of stashed beacon messages.
///
/// This is the case when there are messages waiting AND the persistence gap has reached the
/// configured threshold — meaning we've accumulated enough unpersisted blocks that we need
/// to let persistence catch up before executing more.
fn should_backpressure(&self) -> bool {
!self.stashed_engine_messages.is_empty() &&
self.persistence_gap() >= self.config.persistence_backpressure_threshold()
}
fn try_process_stashed_engine_message(
&mut self,
) -> Result<Option<ops::ControlFlow<()>>, InsertBlockFatalError> {
if self.should_backpressure() {
return Ok(None);
}
let Some(queued) = self.stashed_engine_messages.pop_front() else {
return Ok(None);
};
self.update_backpressure_stash_len_metric();
match queued {
StashedEngineMessage::Beacon { stashed_at, message } => {
let wait = stashed_at.elapsed();
match &message {
BeaconEngineMessage::NewPayload { .. } |
BeaconEngineMessage::RethNewPayload { .. } => {
self.metrics.engine.new_payload_backpressure_wait_seconds.record(wait);
}
BeaconEngineMessage::ForkchoiceUpdated { .. } => {
self.metrics.engine.fcu_backpressure_wait_seconds.record(wait);
}
}
self.process_beacon_message(message, Some(wait)).map(Some)
}
StashedEngineMessage::Other(message) => self.on_engine_message(message).map(Some),
}
}
/// Run the engine API handler.
///
/// This will block the current thread and process incoming messages.
pub fn run(mut self) {
loop {
match self.wait_for_event() {
// Persistence backpressure: beacon messages (newPayload, forkchoiceUpdated) are
// not processed inline - they are always pushed into `stashed_engine_messages`
// first (see `on_engine_message`). Before we block-wait for new events, we try to
// make progress on two fronts:
//
// 1. Poll for persistence completion (non-blocking). If a background flush finished,
// handle it and restart the loop - this shrinks the gap between the canonical tip
// and the last persisted block.
//
// 2. Try to drain one stashed message. This only succeeds when the persistence gap is
// below `persistence_backpressure_threshold`; otherwise the stash stays blocked and
// we skip to the wait below.
//
// If both checks fall through without doing work, we need to wait for an external
// event. The wait strategy depends on whether we are backpressured:
//
// - Backpressured (gap >= threshold, stash non-empty): we call
// `wait_for_persistence_event` which blocks until persistence completes. Any
// incoming messages that arrive in the meantime are stashed, not processed. This is
// what creates the actual back-pressure — the CL's requests sit in the stash and
// their response channels stay open until we catch up.
//
// - Normal: we call `wait_for_event` which accepts whichever channel fires first —
// persistence completion or incoming message.
match self.try_poll_persistence_completion() {
Ok(true) => {
if let Err(err) = self.advance_persistence() {
error!(target: "engine::tree", %err, "Advancing persistence failed");
return
}
continue;
}
Ok(false) => {}
Err(err) => {
error!(target: "engine::tree", %err, "Polling persistence failed");
return
}
}
match self.try_process_stashed_engine_message() {
Ok(Some(ops::ControlFlow::Break(()))) => return,
Ok(Some(ops::ControlFlow::Continue(()))) => {
if let Err(err) = self.advance_persistence() {
error!(target: "engine::tree", %err, "Advancing persistence failed");
return
}
continue;
}
Ok(None) => {}
Err(fatal) => {
error!(target: "engine::tree", %fatal, "insert block fatal error");
return
}
}
let event = if self.should_backpressure() {
if let Err(err) = self.advance_persistence() {
error!(target: "engine::tree", %err, "Advancing persistence failed");
return
}
match self.wait_for_persistence_event() {
Ok(event) => event,
Err(fatal) => {
error!(target: "engine::tree", %fatal, "insert block fatal error");
return;
}
}
} else {
self.wait_for_event()
};
match event {
LoopEvent::EngineMessage(msg) => {
debug!(target: "engine::tree", %msg, "received new engine message");
match self.on_engine_message(msg) {
@@ -512,6 +685,39 @@ where
}
}
/// Blocks until a persistence task completes, used when we are under backpressure.
///
/// While waiting, incoming messages continue to be read from the channel (so senders don't
/// block) but they are stashed rather than processed. The only event that can break out of
/// this wait is persistence completion — that's what makes it "backpressure": we refuse to
/// do new work until persistence catches up.
///
/// Falls back to the normal `wait_for_event` if no persistence task is in flight.
fn wait_for_persistence_event(&mut self) -> Result<LoopEvent<T, N>, InsertBlockFatalError> {
let maybe_persistence = self.persistence_state.rx.take();
if let Some((persistence_rx, start_time, _action)) = maybe_persistence {
loop {
crossbeam_channel::select_biased! {
recv(persistence_rx) -> result => {
return Ok(match result {
Ok(result) => LoopEvent::PersistenceComplete { result, start_time },
Err(_) => LoopEvent::Disconnected,
})
}
recv(self.incoming) -> msg => {
match msg {
Ok(message) => self.stash_incoming_message(message)?,
Err(_) => return Ok(LoopEvent::Disconnected),
}
}
}
}
} else {
Ok(self.wait_for_event())
}
}
/// Blocks until the next event is ready: either an incoming engine message or a persistence
/// completion (if one is in progress).
///
@@ -1341,8 +1547,7 @@ where
/// Tries to poll for a completed persistence task (non-blocking).
///
/// Returns `true` if a persistence task was completed, `false` otherwise.
#[cfg(test)]
pub fn try_poll_persistence(&mut self) -> Result<bool, AdvancePersistenceError> {
fn try_poll_persistence_completion(&mut self) -> Result<bool, AdvancePersistenceError> {
let Some((rx, start_time, action)) = self.persistence_state.rx.take() else {
return Ok(false);
};
@@ -1363,6 +1568,11 @@ where
}
}
#[cfg(test)]
fn try_poll_persistence(&mut self) -> Result<bool, AdvancePersistenceError> {
self.try_poll_persistence_completion()
}
/// Handles a completed persistence task.
fn on_persistence_complete(
&mut self,
@@ -1479,165 +1689,7 @@ where
));
}
EngineApiRequest::Beacon(request) => {
match request {
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
let has_attrs = payload_attrs.is_some();
let start = Instant::now();
let mut output = self.on_forkchoice_updated(state, payload_attrs);
if let Ok(res) = &mut output {
// track last received forkchoice state
self.state
.forkchoice_state_tracker
.set_latest(state, res.outcome.forkchoice_status());
// emit an event about the handled FCU
self.emit_event(ConsensusEngineEvent::ForkchoiceUpdated(
state,
res.outcome.forkchoice_status(),
));
// handle the event if any
self.on_maybe_tree_event(res.event.take())?;
}
if let Err(ref err) = output {
error!(target: "engine::tree", %err, ?state, "Error processing forkchoice update");
}
self.metrics.engine.forkchoice_updated.update_response_metrics(
start,
&mut self.metrics.engine.new_payload.latest_finish_at,
has_attrs,
&output,
);
if let Err(err) =
tx.send(output.map(|o| o.outcome).map_err(Into::into))
{
self.metrics
.engine
.failed_forkchoice_updated_response_deliveries
.increment(1);
warn!(target: "engine::tree", ?state, elapsed=?start.elapsed(), "Failed to deliver forkchoiceUpdated response, receiver dropped (request cancelled): {err:?}");
}
}
BeaconEngineMessage::NewPayload { payload, tx } => {
let start = Instant::now();
let gas_used = payload.gas_used();
let num_hash = payload.num_hash();
let mut output = self.on_new_payload(payload);
self.metrics.engine.new_payload.update_response_metrics(
start,
&mut self.metrics.engine.forkchoice_updated.latest_finish_at,
&output,
gas_used,
);
let maybe_event =
output.as_mut().ok().and_then(|out| out.event.take());
// emit response
if let Err(err) =
tx.send(output.map(|o| o.outcome).map_err(|e| {
BeaconOnNewPayloadError::Internal(Box::new(e))
}))
{
warn!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to deliver newPayload response, receiver dropped (request cancelled): {err:?}");
self.metrics
.engine
.failed_new_payload_response_deliveries
.increment(1);
}
// handle the event if any
self.on_maybe_tree_event(maybe_event)?;
}
BeaconEngineMessage::RethNewPayload {
payload,
wait_for_persistence,
wait_for_caches,
tx,
} => {
debug!(
target: "engine::tree",
wait_for_persistence,
wait_for_caches,
"Processing reth_newPayload"
);
let persistence_wait = if wait_for_persistence {
let pending_persistence = self.persistence_state.rx.take();
if let Some((rx, start_time, _action)) = pending_persistence {
let (persistence_tx, persistence_rx) =
std::sync::mpsc::channel();
self.runtime.spawn_blocking_named(
"wait-persist",
move || {
let start = Instant::now();
let result = rx
.recv()
.expect("persistence state channel closed");
let _ = persistence_tx.send((
result,
start_time,
start.elapsed(),
));
},
);
let (result, start_time, wait_duration) = persistence_rx
.recv()
.expect("persistence result channel closed");
let _ = self.on_persistence_complete(result, start_time);
Some(wait_duration)
} else {
Some(Duration::ZERO)
}
} else {
None
};
let cache_wait = wait_for_caches
.then(|| self.payload_validator.wait_for_caches());
let start = Instant::now();
let gas_used = payload.gas_used();
let num_hash = payload.num_hash();
let mut output = self.on_new_payload(payload);
let latency = start.elapsed();
self.metrics.engine.new_payload.update_response_metrics(
start,
&mut self.metrics.engine.forkchoice_updated.latest_finish_at,
&output,
gas_used,
);
let maybe_event =
output.as_mut().ok().and_then(|out| out.event.take());
let timings = NewPayloadTimings {
latency,
persistence_wait,
execution_cache_wait: cache_wait
.map(|wait| wait.execution_cache),
sparse_trie_wait: cache_wait.map(|wait| wait.sparse_trie),
};
if let Err(err) =
tx.send(output.map(|o| (o.outcome, timings)).map_err(|e| {
BeaconOnNewPayloadError::Internal(Box::new(e))
}))
{
error!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
self.metrics
.engine
.failed_new_payload_response_deliveries
.increment(1);
}
self.on_maybe_tree_event(maybe_event)?;
}
}
self.stash_beacon_message(request);
}
}
}
@@ -1650,6 +1702,144 @@ where
Ok(ops::ControlFlow::Continue(()))
}
fn process_beacon_message(
&mut self,
request: BeaconEngineMessage<T>,
backpressure_wait: Option<Duration>,
) -> Result<ops::ControlFlow<()>, InsertBlockFatalError> {
match request {
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
let has_attrs = payload_attrs.is_some();
let start = Instant::now();
let mut output = self.on_forkchoice_updated(state, payload_attrs);
if let Ok(res) = &mut output {
self.state
.forkchoice_state_tracker
.set_latest(state, res.outcome.forkchoice_status());
self.emit_event(ConsensusEngineEvent::ForkchoiceUpdated(
state,
res.outcome.forkchoice_status(),
));
self.on_maybe_tree_event(res.event.take())?;
}
if let Err(ref err) = output {
error!(target: "engine::tree", %err, ?state, "Error processing forkchoice update");
}
self.metrics.engine.forkchoice_updated.update_response_metrics(
start,
&mut self.metrics.engine.new_payload.latest_finish_at,
has_attrs,
&output,
);
if let Err(err) = tx.send(output.map(|o| o.outcome).map_err(Into::into)) {
self.metrics.engine.failed_forkchoice_updated_response_deliveries.increment(1);
warn!(target: "engine::tree", ?state, elapsed=?start.elapsed(), "Failed to deliver forkchoiceUpdated response, receiver dropped (request cancelled): {err:?}");
}
}
BeaconEngineMessage::NewPayload { payload, tx } => {
let start = Instant::now();
let gas_used = payload.gas_used();
let num_hash = payload.num_hash();
let mut output = self.on_new_payload(payload);
self.metrics.engine.new_payload.update_response_metrics(
start,
&mut self.metrics.engine.forkchoice_updated.latest_finish_at,
&output,
gas_used,
);
let maybe_event = output.as_mut().ok().and_then(|out| out.event.take());
if let Err(err) = tx.send(
output
.map(|o| o.outcome)
.map_err(|e| BeaconOnNewPayloadError::Internal(Box::new(e))),
) {
warn!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to deliver newPayload response, receiver dropped (request cancelled): {err:?}");
self.metrics.engine.failed_new_payload_response_deliveries.increment(1);
}
self.on_maybe_tree_event(maybe_event)?;
}
BeaconEngineMessage::RethNewPayload {
payload,
wait_for_persistence,
wait_for_caches,
tx,
} => {
debug!(
target: "engine::tree",
wait_for_persistence,
wait_for_caches,
"Processing reth_newPayload"
);
let persistence_wait = if wait_for_persistence {
let pending_persistence = self.persistence_state.rx.take();
if let Some((rx, start_time, _action)) = pending_persistence {
let (persistence_tx, persistence_rx) = std::sync::mpsc::channel();
self.runtime.spawn_blocking_named("wait-persist", move || {
let start = Instant::now();
let result = rx.recv().expect("persistence state channel closed");
let _ = persistence_tx.send((result, start_time, start.elapsed()));
});
let (result, start_time, wait_duration) =
persistence_rx.recv().expect("persistence result channel closed");
let _ = self.on_persistence_complete(result, start_time);
Some(wait_duration)
} else {
Some(Duration::ZERO)
}
} else {
None
};
let cache_wait = wait_for_caches.then(|| self.payload_validator.wait_for_caches());
let start = Instant::now();
let gas_used = payload.gas_used();
let num_hash = payload.num_hash();
let mut output = self.on_new_payload(payload);
let latency = start.elapsed();
self.metrics.engine.new_payload.update_response_metrics(
start,
&mut self.metrics.engine.forkchoice_updated.latest_finish_at,
&output,
gas_used,
);
let maybe_event = output.as_mut().ok().and_then(|out| out.event.take());
let timings = NewPayloadTimings {
latency,
backpressure_wait,
persistence_wait,
execution_cache_wait: cache_wait.map(|wait| wait.execution_cache),
sparse_trie_wait: cache_wait.map(|wait| wait.sparse_trie),
};
if let Err(err) = tx.send(
output
.map(|o| (o.outcome, timings))
.map_err(|e| BeaconOnNewPayloadError::Internal(Box::new(e))),
) {
error!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
self.metrics.engine.failed_new_payload_response_deliveries.increment(1);
}
self.on_maybe_tree_event(maybe_event)?;
}
}
Ok(ops::ControlFlow::Continue(()))
}
/// Invoked if the backfill sync has finished to target.
///
/// At this point we consider the block synced to the backfill target.

View File

@@ -12,7 +12,7 @@ use reth_trie_db::ChangesetCache;
use alloy_eips::eip1898::BlockWithParent;
use alloy_primitives::{
map::{B256Map, B256Set},
Bytes, B256,
Address, Bytes, B256,
};
use alloy_rlp::Decodable;
use alloy_rpc_types_engine::{
@@ -286,6 +286,11 @@ impl TestHarness {
self
}
fn process_next_stashed_message(&mut self) {
let processed = self.tree.try_process_stashed_engine_message().unwrap();
assert!(processed.is_some(), "expected a stashed message to be processed");
}
async fn fcu_to(&mut self, block_hash: B256, fcu_status: impl Into<ForkchoiceStatus>) {
let fcu_status = fcu_status.into();
@@ -309,6 +314,7 @@ impl TestHarness {
.into(),
))
.unwrap();
self.process_next_stashed_message();
let response = rx.await.unwrap().unwrap().await.unwrap();
match fcu_status.into() {
@@ -608,6 +614,7 @@ async fn test_engine_request_during_backfill() {
.into(),
))
.unwrap();
test_harness.process_next_stashed_message();
let resp = rx.await.unwrap().unwrap().await.unwrap();
assert!(resp.payload_status.is_syncing());
@@ -686,11 +693,227 @@ async fn test_holesky_payload() {
.into(),
))
.unwrap();
test_harness.process_next_stashed_message();
let resp = rx.await.unwrap().unwrap();
assert!(resp.is_syncing());
}
#[test]
fn test_stashed_beacon_message_processes_below_backpressure_threshold() {
let mut test_harness = TestHarness::new(MAINNET.clone());
test_harness.tree.config = test_harness
.tree
.config
.with_persistence_threshold(0)
.with_persistence_backpressure_threshold(1);
let (tx, mut rx) = oneshot::channel();
let _ = test_harness
.tree
.on_engine_message(FromEngine::Request(
BeaconEngineMessage::ForkchoiceUpdated {
state: ForkchoiceState {
head_block_hash: B256::random(),
safe_block_hash: B256::random(),
finalized_block_hash: B256::random(),
},
payload_attrs: None,
tx,
}
.into(),
))
.unwrap();
assert_eq!(test_harness.tree.stashed_engine_messages.len(), 1);
assert!(!test_harness.tree.should_backpressure());
test_harness.process_next_stashed_message();
assert!(rx.try_recv().is_ok(), "expected stashed response after processing");
assert!(test_harness.tree.stashed_engine_messages.is_empty());
}
#[test]
fn test_stashed_beacon_message_stays_stashed_while_backpressured() {
let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(1..4).collect();
let mut test_harness = TestHarness::new(MAINNET.clone()).with_blocks(blocks.clone());
test_harness.tree.config = test_harness
.tree
.config
.with_persistence_threshold(0)
.with_persistence_backpressure_threshold(1);
let (_persist_tx, persist_rx) = crossbeam_channel::bounded(1);
test_harness
.tree
.persistence_state
.start_save(blocks.last().unwrap().recovered_block().num_hash(), persist_rx);
let (tx, _rx) = oneshot::channel();
let _ = test_harness
.tree
.on_engine_message(FromEngine::Request(
BeaconEngineMessage::ForkchoiceUpdated {
state: ForkchoiceState {
head_block_hash: B256::random(),
safe_block_hash: B256::random(),
finalized_block_hash: B256::random(),
},
payload_attrs: None,
tx,
}
.into(),
))
.unwrap();
assert!(test_harness.tree.should_backpressure());
assert!(test_harness.tree.try_process_stashed_engine_message().unwrap().is_none());
assert_eq!(test_harness.tree.stashed_engine_messages.len(), 1);
}
#[test]
fn test_backpressure_waits_for_persistence_before_reading_incoming() {
let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(1..4).collect();
let mut test_harness = TestHarness::new(MAINNET.clone()).with_blocks(blocks.clone());
test_harness.tree.config = test_harness
.tree
.config
.with_persistence_threshold(0)
.with_persistence_backpressure_threshold(1);
let (persist_tx, persist_rx) = crossbeam_channel::bounded(1);
let persisted = blocks.last().unwrap().recovered_block().num_hash();
test_harness.tree.persistence_state.start_save(persisted, persist_rx);
let (tx, _rx) = oneshot::channel();
let _ = test_harness
.tree
.on_engine_message(FromEngine::Request(
BeaconEngineMessage::ForkchoiceUpdated {
state: ForkchoiceState {
head_block_hash: B256::random(),
safe_block_hash: B256::random(),
finalized_block_hash: B256::random(),
},
payload_attrs: None,
tx,
}
.into(),
))
.unwrap();
test_harness.to_tree_tx.send(FromEngine::DownloadedBlocks(vec![])).unwrap();
std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(10));
persist_tx
.send(PersistenceResult {
last_block: Some(persisted),
commit_duration: Some(Duration::ZERO),
})
.unwrap();
});
let event = test_harness.tree.wait_for_persistence_event().unwrap();
assert!(matches!(event, super::LoopEvent::PersistenceComplete { .. }));
assert_eq!(test_harness.tree.stashed_engine_messages.len(), 2);
let super::LoopEvent::PersistenceComplete { result, start_time } = event else {
unreachable!()
};
test_harness.tree.on_persistence_complete(result, start_time).unwrap();
test_harness.process_next_stashed_message();
assert!(test_harness.tree.stashed_engine_messages.len() == 1);
test_harness.process_next_stashed_message();
assert!(test_harness.tree.stashed_engine_messages.is_empty());
}
#[test]
fn test_backpressure_handler_enqueues_attrs_fcu() {
let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(1..2).collect();
let mut test_harness = TestHarness::new(MAINNET.clone()).with_blocks(blocks);
test_harness.tree.config = test_harness
.tree
.config
.clone()
.with_persistence_threshold(0)
.with_persistence_backpressure_threshold(1)
.with_always_process_payload_attributes_on_canonical_head(true);
let (_persist_tx, persist_rx) = crossbeam_channel::bounded(1);
let persisted = test_harness.tree.state.tree_state.current_canonical_head;
test_harness.tree.persistence_state.start_save(persisted, persist_rx);
let head = test_harness.tree.canonical_in_memory_state.get_canonical_head();
let (tx, mut rx) = oneshot::channel();
test_harness
.tree
.stash_incoming_message(FromEngine::Request(
BeaconEngineMessage::ForkchoiceUpdated {
state: ForkchoiceState {
head_block_hash: head.hash(),
safe_block_hash: B256::ZERO,
finalized_block_hash: B256::ZERO,
},
payload_attrs: Some(alloy_rpc_types_engine::PayloadAttributes {
timestamp: head.timestamp().saturating_add(1),
prev_randao: B256::ZERO,
suggested_fee_recipient: Address::ZERO,
withdrawals: None,
parent_beacon_block_root: None,
}),
tx,
}
.into(),
))
.unwrap();
assert_eq!(test_harness.tree.stashed_engine_messages.len(), 1);
assert!(rx.try_recv().is_err());
}
#[tokio::test]
async fn test_reth_new_payload_reports_backpressure_wait() {
let s = include_str!("../../test-data/holesky/2.rlp");
let data = Bytes::from_str(s).unwrap();
let block = Block::decode(&mut data.as_ref()).unwrap();
let sealed = block.seal_slow();
let hash = sealed.hash();
let block = sealed.into_block();
let payload = ExecutionPayloadV1::from_block_unchecked(hash, &block);
let mut test_harness = TestHarness::new(HOLESKY.clone());
let (tx, rx) = oneshot::channel();
let _ = test_harness
.tree
.on_engine_message(FromEngine::Request(
BeaconEngineMessage::RethNewPayload {
payload: ExecutionData {
payload: payload.into(),
sidecar: ExecutionPayloadSidecar::none(),
},
wait_for_persistence: false,
wait_for_caches: false,
tx,
}
.into(),
))
.unwrap();
// `backpressure_wait` is measured as `stashed_at.elapsed()` when the queued
// message is drained. Without a small delay here, stashing and draining can
// happen in the same clock tick and legitimately report `Duration::ZERO`,
// which would make the `> Duration::ZERO` assertion below flaky.
std::thread::sleep(Duration::from_millis(10));
test_harness.process_next_stashed_message();
let (_, timings) = rx.await.unwrap().unwrap();
assert!(timings.backpressure_wait.is_some());
assert!(timings.backpressure_wait.unwrap() > Duration::ZERO);
assert_eq!(timings.persistence_wait, None);
}
#[tokio::test]
async fn test_tree_state_on_new_head_reorg() {
reth_tracing::init_test_tracing();
@@ -1093,6 +1316,7 @@ async fn test_fcu_with_canonical_ancestor_updates_latest_block() {
.into(),
))
.unwrap();
test_harness.process_next_stashed_message();
// Verify FCU succeeds
let response = rx.await.unwrap().unwrap().await.unwrap();

View File

@@ -13,6 +13,11 @@ pub struct RethPayloadStatus {
pub status: PayloadStatus,
/// Server-side execution latency in microseconds.
pub latency_us: u64,
/// Time spent waiting in the backpressure queue before processing, in microseconds.
///
/// `None` when the message was not queued.
#[serde(skip_serializing_if = "Option::is_none")]
pub backpressure_wait_us: Option<u64>,
/// Time spent waiting for persistence to complete, in microseconds.
///
/// `None` when wasn't asked to wait.

View File

@@ -55,6 +55,7 @@ impl<Payload: PayloadTypes> RethEngineApiServer<Payload::ExecutionData> for Reth
Ok(RethPayloadStatus {
status,
latency_us: timings.latency.as_micros() as u64,
backpressure_wait_us: timings.backpressure_wait.map(|d| d.as_micros() as u64),
persistence_wait_us: timings.persistence_wait.map(|d| d.as_micros() as u64),
execution_cache_wait_us: timings.execution_cache_wait.map(|d| d.as_micros() as u64),
sparse_trie_wait_us: timings.sparse_trie_wait.map(|d| d.as_micros() as u64),