mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
5 Commits
devnet4
...
pep/backpr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5732c3ce5f | ||
|
|
32c55d3c48 | ||
|
|
624a1eb428 | ||
|
|
8fbcf7f78b | ||
|
|
b37a77f945 |
@@ -216,6 +216,7 @@ impl Command {
|
||||
let new_payload_result = NewPayloadResult {
|
||||
gas_used,
|
||||
latency: np_latency,
|
||||
backpressure_wait: server_timings.as_ref().and_then(|t| t.backpressure_wait),
|
||||
persistence_wait: server_timings.as_ref().and_then(|t| t.persistence_wait),
|
||||
execution_cache_wait: server_timings
|
||||
.as_ref()
|
||||
|
||||
@@ -142,6 +142,7 @@ impl Command {
|
||||
let new_payload_result = NewPayloadResult {
|
||||
gas_used,
|
||||
latency,
|
||||
backpressure_wait: server_timings.as_ref().and_then(|t| t.backpressure_wait),
|
||||
persistence_wait: server_timings.as_ref().and_then(|t| t.persistence_wait),
|
||||
execution_cache_wait: server_timings
|
||||
.as_ref()
|
||||
|
||||
@@ -25,6 +25,8 @@ pub(crate) struct NewPayloadResult {
|
||||
pub(crate) gas_used: u64,
|
||||
/// The latency of the `newPayload` call.
|
||||
pub(crate) latency: Duration,
|
||||
/// Time spent waiting in the tree backpressure queue. `None` when the message was not queued.
|
||||
pub(crate) backpressure_wait: Option<Duration>,
|
||||
/// Time spent waiting for persistence. `None` when no persistence was in-flight.
|
||||
pub(crate) persistence_wait: Option<Duration>,
|
||||
/// Time spent waiting for execution cache lock.
|
||||
@@ -61,9 +63,11 @@ impl Serialize for NewPayloadResult {
|
||||
{
|
||||
// convert the time to microseconds
|
||||
let time = self.latency.as_micros();
|
||||
let mut state = serializer.serialize_struct("NewPayloadResult", 5)?;
|
||||
let mut state = serializer.serialize_struct("NewPayloadResult", 6)?;
|
||||
state.serialize_field("gas_used", &self.gas_used)?;
|
||||
state.serialize_field("latency", &time)?;
|
||||
state
|
||||
.serialize_field("backpressure_wait", &self.backpressure_wait.map(|d| d.as_micros()))?;
|
||||
state.serialize_field("persistence_wait", &self.persistence_wait.map(|d| d.as_micros()))?;
|
||||
state.serialize_field("execution_cache_wait", &self.execution_cache_wait.as_micros())?;
|
||||
state.serialize_field("sparse_trie_wait", &self.sparse_trie_wait.as_micros())?;
|
||||
@@ -119,6 +123,9 @@ impl std::fmt::Display for CombinedResult {
|
||||
if let Some(d) = np.persistence_wait {
|
||||
write!(f, ", persistence wait: {d:?}")?;
|
||||
}
|
||||
if let Some(d) = np.backpressure_wait {
|
||||
write!(f, ", backpressure wait: {d:?}")?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -134,7 +141,7 @@ impl Serialize for CombinedResult {
|
||||
let fcu_latency = self.fcu_latency.as_micros();
|
||||
let new_payload_latency = self.new_payload_result.latency.as_micros();
|
||||
let total_latency = self.total_latency.as_micros();
|
||||
let mut state = serializer.serialize_struct("CombinedResult", 10)?;
|
||||
let mut state = serializer.serialize_struct("CombinedResult", 11)?;
|
||||
|
||||
// flatten the new payload result because this is meant for CSV writing
|
||||
state.serialize_field("block_number", &self.block_number)?;
|
||||
@@ -144,6 +151,10 @@ impl Serialize for CombinedResult {
|
||||
state.serialize_field("new_payload_latency", &new_payload_latency)?;
|
||||
state.serialize_field("fcu_latency", &fcu_latency)?;
|
||||
state.serialize_field("total_latency", &total_latency)?;
|
||||
state.serialize_field(
|
||||
"backpressure_wait",
|
||||
&self.new_payload_result.backpressure_wait.map(|d| d.as_micros()),
|
||||
)?;
|
||||
state.serialize_field(
|
||||
"persistence_wait",
|
||||
&self.new_payload_result.persistence_wait.map(|d| d.as_micros()),
|
||||
@@ -315,4 +326,37 @@ mod tests {
|
||||
let second_line = result.next().unwrap().unwrap();
|
||||
assert_eq!(second_line, expected_second_line);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_write_combined_result_csv_includes_backpressure_wait() {
|
||||
let result = CombinedResult {
|
||||
block_number: 1,
|
||||
gas_limit: 30_000_000,
|
||||
transaction_count: 10,
|
||||
new_payload_result: NewPayloadResult {
|
||||
gas_used: 1_000,
|
||||
latency: Duration::from_micros(2_000),
|
||||
backpressure_wait: Some(Duration::from_micros(300)),
|
||||
persistence_wait: Some(Duration::from_micros(400)),
|
||||
execution_cache_wait: Duration::from_micros(500),
|
||||
sparse_trie_wait: Duration::from_micros(600),
|
||||
},
|
||||
fcu_latency: Duration::from_micros(700),
|
||||
total_latency: Duration::from_micros(2_700),
|
||||
};
|
||||
|
||||
let mut writer = Writer::from_writer(vec![]);
|
||||
writer.serialize(result).unwrap();
|
||||
let result = writer.into_inner().unwrap();
|
||||
|
||||
let mut result = result.as_slice().lines();
|
||||
|
||||
let expected_first_line = "block_number,gas_limit,transaction_count,gas_used,new_payload_latency,fcu_latency,total_latency,backpressure_wait,persistence_wait,execution_cache_wait,sparse_trie_wait";
|
||||
let first_line = result.next().unwrap().unwrap();
|
||||
assert_eq!(first_line, expected_first_line);
|
||||
|
||||
let expected_second_line = "1,30000000,10,1000,2000,700,2700,300,400,500,600";
|
||||
let second_line = result.next().unwrap().unwrap();
|
||||
assert_eq!(second_line, expected_second_line);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -264,6 +264,7 @@ impl Command {
|
||||
let new_payload_result = NewPayloadResult {
|
||||
gas_used,
|
||||
latency: np_latency,
|
||||
backpressure_wait: server_timings.as_ref().and_then(|t| t.backpressure_wait),
|
||||
persistence_wait: server_timings.as_ref().and_then(|t| t.persistence_wait),
|
||||
execution_cache_wait: server_timings
|
||||
.as_ref()
|
||||
|
||||
@@ -313,6 +313,8 @@ pub(crate) async fn call_new_payload<N: Network, P: Provider<N>>(
|
||||
struct RethPayloadStatus {
|
||||
latency_us: u64,
|
||||
#[serde(default)]
|
||||
backpressure_wait_us: Option<u64>,
|
||||
#[serde(default)]
|
||||
persistence_wait_us: Option<u64>,
|
||||
#[serde(default)]
|
||||
execution_cache_wait_us: u64,
|
||||
@@ -325,6 +327,8 @@ struct RethPayloadStatus {
|
||||
pub(crate) struct NewPayloadTimingBreakdown {
|
||||
/// Server-side execution latency.
|
||||
pub(crate) latency: Duration,
|
||||
/// Time spent waiting in the backpressure queue. `None` when the message was not queued.
|
||||
pub(crate) backpressure_wait: Option<Duration>,
|
||||
/// Time spent waiting for persistence. `None` when no persistence was in-flight.
|
||||
pub(crate) persistence_wait: Option<Duration>,
|
||||
/// Time spent waiting for execution cache lock.
|
||||
@@ -374,6 +378,7 @@ pub(crate) async fn call_new_payload_with_reth<N: Network, P: Provider<N>>(
|
||||
|
||||
Ok(Some(NewPayloadTimingBreakdown {
|
||||
latency: Duration::from_micros(resp.latency_us),
|
||||
backpressure_wait: resp.backpressure_wait_us.map(Duration::from_micros),
|
||||
persistence_wait: resp.persistence_wait_us.map(Duration::from_micros),
|
||||
execution_cache_wait: Duration::from_micros(resp.execution_cache_wait_us),
|
||||
sparse_trie_wait: Duration::from_micros(resp.sparse_trie_wait_us),
|
||||
|
||||
@@ -6,6 +6,9 @@ use core::time::Duration;
|
||||
/// Triggers persistence when the number of canonical blocks in memory exceeds this threshold.
|
||||
pub const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 2;
|
||||
|
||||
/// Maximum canonical-minus-persisted gap before engine API processing is stalled.
|
||||
pub const DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD: u64 = 16;
|
||||
|
||||
/// How close to the canonical head we persist blocks.
|
||||
pub const DEFAULT_MEMORY_BLOCK_BUFFER_TARGET: u64 = 0;
|
||||
|
||||
@@ -44,6 +47,16 @@ const DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH: u32 = 256;
|
||||
const DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE: usize = 4;
|
||||
const DEFAULT_CROSS_BLOCK_CACHE_SIZE: usize = default_cross_block_cache_size();
|
||||
|
||||
const fn assert_backpressure_threshold_invariant(
|
||||
persistence_threshold: u64,
|
||||
persistence_backpressure_threshold: u64,
|
||||
) {
|
||||
debug_assert!(
|
||||
persistence_backpressure_threshold > persistence_threshold,
|
||||
"persistence_backpressure_threshold must be greater than persistence_threshold",
|
||||
);
|
||||
}
|
||||
|
||||
const fn default_cross_block_cache_size() -> usize {
|
||||
if cfg!(test) {
|
||||
1024 * 1024 // 1 MB in tests
|
||||
@@ -82,6 +95,8 @@ pub struct TreeConfig {
|
||||
///
|
||||
/// Note: this should be less than or equal to `persistence_threshold`.
|
||||
memory_block_buffer_target: u64,
|
||||
/// Maximum canonical-minus-persisted gap before engine API processing is stalled.
|
||||
persistence_backpressure_threshold: u64,
|
||||
/// Number of pending blocks that cannot be executed due to missing parent and
|
||||
/// are kept in cache.
|
||||
block_buffer_limit: u32,
|
||||
@@ -162,9 +177,14 @@ pub struct TreeConfig {
|
||||
|
||||
impl Default for TreeConfig {
|
||||
fn default() -> Self {
|
||||
assert_backpressure_threshold_invariant(
|
||||
DEFAULT_PERSISTENCE_THRESHOLD,
|
||||
DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD,
|
||||
);
|
||||
Self {
|
||||
persistence_threshold: DEFAULT_PERSISTENCE_THRESHOLD,
|
||||
memory_block_buffer_target: DEFAULT_MEMORY_BLOCK_BUFFER_TARGET,
|
||||
persistence_backpressure_threshold: DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD,
|
||||
block_buffer_limit: DEFAULT_BLOCK_BUFFER_LIMIT,
|
||||
max_invalid_header_cache_length: DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH,
|
||||
max_execute_block_batch_size: DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE,
|
||||
@@ -201,6 +221,7 @@ impl TreeConfig {
|
||||
pub const fn new(
|
||||
persistence_threshold: u64,
|
||||
memory_block_buffer_target: u64,
|
||||
persistence_backpressure_threshold: u64,
|
||||
block_buffer_limit: u32,
|
||||
max_invalid_header_cache_length: u32,
|
||||
max_execute_block_batch_size: usize,
|
||||
@@ -225,9 +246,14 @@ impl TreeConfig {
|
||||
state_root_task_timeout: Option<Duration>,
|
||||
share_execution_cache_with_payload_builder: bool,
|
||||
) -> Self {
|
||||
assert_backpressure_threshold_invariant(
|
||||
persistence_threshold,
|
||||
persistence_backpressure_threshold,
|
||||
);
|
||||
Self {
|
||||
persistence_threshold,
|
||||
memory_block_buffer_target,
|
||||
persistence_backpressure_threshold,
|
||||
block_buffer_limit,
|
||||
max_invalid_header_cache_length,
|
||||
max_execute_block_batch_size,
|
||||
@@ -267,6 +293,11 @@ impl TreeConfig {
|
||||
self.memory_block_buffer_target
|
||||
}
|
||||
|
||||
/// Return the persistence backpressure threshold.
|
||||
pub const fn persistence_backpressure_threshold(&self) -> u64 {
|
||||
self.persistence_backpressure_threshold
|
||||
}
|
||||
|
||||
/// Return the block buffer limit.
|
||||
pub const fn block_buffer_limit(&self) -> u32 {
|
||||
self.block_buffer_limit
|
||||
@@ -363,6 +394,10 @@ impl TreeConfig {
|
||||
/// Setter for persistence threshold.
|
||||
pub const fn with_persistence_threshold(mut self, persistence_threshold: u64) -> Self {
|
||||
self.persistence_threshold = persistence_threshold;
|
||||
assert_backpressure_threshold_invariant(
|
||||
self.persistence_threshold,
|
||||
self.persistence_backpressure_threshold,
|
||||
);
|
||||
self
|
||||
}
|
||||
|
||||
@@ -375,6 +410,19 @@ impl TreeConfig {
|
||||
self
|
||||
}
|
||||
|
||||
/// Setter for persistence backpressure threshold.
|
||||
pub const fn with_persistence_backpressure_threshold(
|
||||
mut self,
|
||||
persistence_backpressure_threshold: u64,
|
||||
) -> Self {
|
||||
self.persistence_backpressure_threshold = persistence_backpressure_threshold;
|
||||
assert_backpressure_threshold_invariant(
|
||||
self.persistence_threshold,
|
||||
self.persistence_backpressure_threshold,
|
||||
);
|
||||
self
|
||||
}
|
||||
|
||||
/// Setter for block buffer limit.
|
||||
pub const fn with_block_buffer_limit(mut self, block_buffer_limit: u32) -> Self {
|
||||
self.block_buffer_limit = block_buffer_limit;
|
||||
@@ -592,3 +640,18 @@ impl TreeConfig {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::TreeConfig;
|
||||
|
||||
#[test]
|
||||
#[should_panic(
|
||||
expected = "persistence_backpressure_threshold must be greater than persistence_threshold"
|
||||
)]
|
||||
fn rejects_backpressure_threshold_at_or_below_persistence_threshold() {
|
||||
let _ = TreeConfig::default()
|
||||
.with_persistence_threshold(4)
|
||||
.with_persistence_backpressure_threshold(4);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -402,10 +402,10 @@ mod tests {
|
||||
#[test]
|
||||
fn test_forkchoice_state_hash_find() {
|
||||
// Define example hashes
|
||||
let head_hash = B256::random();
|
||||
let safe_hash = B256::random();
|
||||
let finalized_hash = B256::random();
|
||||
let non_matching_hash = B256::random();
|
||||
let head_hash = B256::with_last_byte(1);
|
||||
let safe_hash = B256::with_last_byte(2);
|
||||
let finalized_hash = B256::with_last_byte(3);
|
||||
let non_matching_hash = B256::with_last_byte(4);
|
||||
|
||||
// Create a ForkchoiceState with specific hashes
|
||||
let state = ForkchoiceState {
|
||||
|
||||
@@ -148,6 +148,10 @@ impl Future for PendingPayloadId {
|
||||
pub struct NewPayloadTimings {
|
||||
/// Server-side execution latency.
|
||||
pub latency: Duration,
|
||||
/// Time spent waiting in the backpressure queue before processing.
|
||||
///
|
||||
/// `None` when the message was not queued.
|
||||
pub backpressure_wait: Option<Duration>,
|
||||
/// Time spent waiting for persistence to complete.
|
||||
///
|
||||
/// `None` when wasn't asked to wait for persistence.
|
||||
|
||||
@@ -171,6 +171,12 @@ pub struct EngineMetrics {
|
||||
pub(crate) executed_new_block_cache_miss: Counter,
|
||||
/// Histogram of persistence operation durations (in seconds)
|
||||
pub(crate) persistence_duration: Histogram,
|
||||
/// Histogram of time newPayload-style requests spend buffered in the backpressure queue.
|
||||
pub(crate) new_payload_backpressure_wait_seconds: Histogram,
|
||||
/// Histogram of time forkchoiceUpdated requests spend buffered in the backpressure queue.
|
||||
pub(crate) fcu_backpressure_wait_seconds: Histogram,
|
||||
/// Current number of buffered beacon messages in the tree-local queue.
|
||||
pub(crate) backpressure_buffer_len: Gauge,
|
||||
/// Tracks the how often we failed to deliver a newPayload response.
|
||||
///
|
||||
/// This effectively tracks how often the message sender dropped the channel and indicates a CL
|
||||
|
||||
@@ -40,7 +40,13 @@ use reth_tasks::{spawn_os_thread, utils::increase_thread_priority};
|
||||
use reth_trie_db::ChangesetCache;
|
||||
use revm::interpreter::debug_unreachable;
|
||||
use state::TreeState;
|
||||
use std::{collections::HashMap, fmt::Debug, ops, sync::Arc, time::Duration};
|
||||
use std::{
|
||||
collections::{HashMap, VecDeque},
|
||||
fmt::Debug,
|
||||
ops,
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use crossbeam_channel::{Receiver, Sender};
|
||||
use tokio::sync::{
|
||||
@@ -250,6 +256,12 @@ pub enum TreeAction {
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum StashedEngineMessage<T: PayloadTypes, N: NodePrimitives> {
|
||||
Beacon { stashed_at: Instant, message: BeaconEngineMessage<T> },
|
||||
Other(FromEngine<EngineApiRequest<T, N>, N::Block>),
|
||||
}
|
||||
|
||||
/// The engine API tree handler implementation.
|
||||
///
|
||||
/// This type is responsible for processing engine API requests, maintaining the canonical state and
|
||||
@@ -276,6 +288,8 @@ where
|
||||
incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
|
||||
/// Incoming engine API requests.
|
||||
incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
|
||||
/// Stashed engine messages waiting to be processed once persistence catches up.
|
||||
stashed_engine_messages: VecDeque<StashedEngineMessage<T, N>>,
|
||||
/// Outgoing events that are emitted to the handler.
|
||||
outgoing: UnboundedSender<EngineApiEvent<N>>,
|
||||
/// Channels to the persistence layer.
|
||||
@@ -321,6 +335,7 @@ where
|
||||
.field("payload_validator", &self.payload_validator)
|
||||
.field("state", &self.state)
|
||||
.field("incoming_tx", &self.incoming_tx)
|
||||
.field("stashed_engine_messages", &self.stashed_engine_messages.len())
|
||||
.field("persistence", &self.persistence)
|
||||
.field("persistence_state", &self.persistence_state)
|
||||
.field("backfill_sync_state", &self.backfill_sync_state)
|
||||
@@ -381,6 +396,7 @@ where
|
||||
consensus,
|
||||
payload_validator,
|
||||
incoming,
|
||||
stashed_engine_messages: VecDeque::new(),
|
||||
outgoing,
|
||||
persistence,
|
||||
persistence_state,
|
||||
@@ -472,12 +488,169 @@ where
|
||||
self.incoming_tx.clone()
|
||||
}
|
||||
|
||||
fn update_backpressure_stash_len_metric(&self) {
|
||||
self.metrics.engine.backpressure_buffer_len.set(self.stashed_engine_messages.len() as f64);
|
||||
}
|
||||
|
||||
/// Stashes an incoming engine message without processing it. Called from
|
||||
/// `wait_for_persistence_event` to collect messages that arrive while we are blocked
|
||||
/// waiting for persistence to complete.
|
||||
fn stash_incoming_message(
|
||||
&mut self,
|
||||
message: FromEngine<EngineApiRequest<T, N>, N::Block>,
|
||||
) -> Result<(), InsertBlockFatalError> {
|
||||
match message {
|
||||
FromEngine::Request(EngineApiRequest::Beacon(request)) => {
|
||||
self.stash_beacon_message(request);
|
||||
}
|
||||
other => self.stash_engine_message(other),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Stashes a beacon message (newPayload / forkchoiceUpdated). Stamps it with `stashed_at`
|
||||
/// so we can measure how long it waited once it is eventually processed.
|
||||
fn stash_beacon_message(&mut self, message: BeaconEngineMessage<T>) {
|
||||
self.stashed_engine_messages
|
||||
.push_back(StashedEngineMessage::Beacon { stashed_at: Instant::now(), message });
|
||||
self.update_backpressure_stash_len_metric();
|
||||
}
|
||||
|
||||
/// Stashes a non-beacon engine message (e.g. downloaded blocks). These don't carry a
|
||||
/// timestamp because we don't track backpressure wait times for non-beacon messages.
|
||||
fn stash_engine_message(&mut self, message: FromEngine<EngineApiRequest<T, N>, N::Block>) {
|
||||
self.stashed_engine_messages.push_back(StashedEngineMessage::Other(message));
|
||||
self.update_backpressure_stash_len_metric();
|
||||
}
|
||||
|
||||
/// How many blocks the canonical tip is ahead of the last persisted block. A large gap means
|
||||
/// persistence is falling behind execution.
|
||||
const fn persistence_gap(&self) -> u64 {
|
||||
self.state
|
||||
.tree_state
|
||||
.canonical_block_number()
|
||||
.saturating_sub(self.persistence_state.last_persisted_block.number)
|
||||
}
|
||||
|
||||
/// Returns `true` when the main loop should stall processing of stashed beacon messages.
|
||||
///
|
||||
/// This is the case when there are messages waiting AND the persistence gap has reached the
|
||||
/// configured threshold — meaning we've accumulated enough unpersisted blocks that we need
|
||||
/// to let persistence catch up before executing more.
|
||||
fn should_backpressure(&self) -> bool {
|
||||
!self.stashed_engine_messages.is_empty() &&
|
||||
self.persistence_gap() >= self.config.persistence_backpressure_threshold()
|
||||
}
|
||||
|
||||
fn try_process_stashed_engine_message(
|
||||
&mut self,
|
||||
) -> Result<Option<ops::ControlFlow<()>>, InsertBlockFatalError> {
|
||||
if self.should_backpressure() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let Some(queued) = self.stashed_engine_messages.pop_front() else {
|
||||
return Ok(None);
|
||||
};
|
||||
self.update_backpressure_stash_len_metric();
|
||||
|
||||
match queued {
|
||||
StashedEngineMessage::Beacon { stashed_at, message } => {
|
||||
let wait = stashed_at.elapsed();
|
||||
match &message {
|
||||
BeaconEngineMessage::NewPayload { .. } |
|
||||
BeaconEngineMessage::RethNewPayload { .. } => {
|
||||
self.metrics.engine.new_payload_backpressure_wait_seconds.record(wait);
|
||||
}
|
||||
BeaconEngineMessage::ForkchoiceUpdated { .. } => {
|
||||
self.metrics.engine.fcu_backpressure_wait_seconds.record(wait);
|
||||
}
|
||||
}
|
||||
|
||||
self.process_beacon_message(message, Some(wait)).map(Some)
|
||||
}
|
||||
StashedEngineMessage::Other(message) => self.on_engine_message(message).map(Some),
|
||||
}
|
||||
}
|
||||
|
||||
/// Run the engine API handler.
|
||||
///
|
||||
/// This will block the current thread and process incoming messages.
|
||||
pub fn run(mut self) {
|
||||
loop {
|
||||
match self.wait_for_event() {
|
||||
// Persistence backpressure: beacon messages (newPayload, forkchoiceUpdated) are
|
||||
// not processed inline - they are always pushed into `stashed_engine_messages`
|
||||
// first (see `on_engine_message`). Before we block-wait for new events, we try to
|
||||
// make progress on two fronts:
|
||||
//
|
||||
// 1. Poll for persistence completion (non-blocking). If a background flush finished,
|
||||
// handle it and restart the loop - this shrinks the gap between the canonical tip
|
||||
// and the last persisted block.
|
||||
//
|
||||
// 2. Try to drain one stashed message. This only succeeds when the persistence gap is
|
||||
// below `persistence_backpressure_threshold`; otherwise the stash stays blocked and
|
||||
// we skip to the wait below.
|
||||
//
|
||||
// If both checks fall through without doing work, we need to wait for an external
|
||||
// event. The wait strategy depends on whether we are backpressured:
|
||||
//
|
||||
// - Backpressured (gap >= threshold, stash non-empty): we call
|
||||
// `wait_for_persistence_event` which blocks until persistence completes. Any
|
||||
// incoming messages that arrive in the meantime are stashed, not processed. This is
|
||||
// what creates the actual back-pressure — the CL's requests sit in the stash and
|
||||
// their response channels stay open until we catch up.
|
||||
//
|
||||
// - Normal: we call `wait_for_event` which accepts whichever channel fires first —
|
||||
// persistence completion or incoming message.
|
||||
match self.try_poll_persistence_completion() {
|
||||
Ok(true) => {
|
||||
if let Err(err) = self.advance_persistence() {
|
||||
error!(target: "engine::tree", %err, "Advancing persistence failed");
|
||||
return
|
||||
}
|
||||
continue;
|
||||
}
|
||||
Ok(false) => {}
|
||||
Err(err) => {
|
||||
error!(target: "engine::tree", %err, "Polling persistence failed");
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
match self.try_process_stashed_engine_message() {
|
||||
Ok(Some(ops::ControlFlow::Break(()))) => return,
|
||||
Ok(Some(ops::ControlFlow::Continue(()))) => {
|
||||
if let Err(err) = self.advance_persistence() {
|
||||
error!(target: "engine::tree", %err, "Advancing persistence failed");
|
||||
return
|
||||
}
|
||||
continue;
|
||||
}
|
||||
Ok(None) => {}
|
||||
Err(fatal) => {
|
||||
error!(target: "engine::tree", %fatal, "insert block fatal error");
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
let event = if self.should_backpressure() {
|
||||
if let Err(err) = self.advance_persistence() {
|
||||
error!(target: "engine::tree", %err, "Advancing persistence failed");
|
||||
return
|
||||
}
|
||||
match self.wait_for_persistence_event() {
|
||||
Ok(event) => event,
|
||||
Err(fatal) => {
|
||||
error!(target: "engine::tree", %fatal, "insert block fatal error");
|
||||
return;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
self.wait_for_event()
|
||||
};
|
||||
|
||||
match event {
|
||||
LoopEvent::EngineMessage(msg) => {
|
||||
debug!(target: "engine::tree", %msg, "received new engine message");
|
||||
match self.on_engine_message(msg) {
|
||||
@@ -512,6 +685,39 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Blocks until a persistence task completes, used when we are under backpressure.
|
||||
///
|
||||
/// While waiting, incoming messages continue to be read from the channel (so senders don't
|
||||
/// block) but they are stashed rather than processed. The only event that can break out of
|
||||
/// this wait is persistence completion — that's what makes it "backpressure": we refuse to
|
||||
/// do new work until persistence catches up.
|
||||
///
|
||||
/// Falls back to the normal `wait_for_event` if no persistence task is in flight.
|
||||
fn wait_for_persistence_event(&mut self) -> Result<LoopEvent<T, N>, InsertBlockFatalError> {
|
||||
let maybe_persistence = self.persistence_state.rx.take();
|
||||
|
||||
if let Some((persistence_rx, start_time, _action)) = maybe_persistence {
|
||||
loop {
|
||||
crossbeam_channel::select_biased! {
|
||||
recv(persistence_rx) -> result => {
|
||||
return Ok(match result {
|
||||
Ok(result) => LoopEvent::PersistenceComplete { result, start_time },
|
||||
Err(_) => LoopEvent::Disconnected,
|
||||
})
|
||||
}
|
||||
recv(self.incoming) -> msg => {
|
||||
match msg {
|
||||
Ok(message) => self.stash_incoming_message(message)?,
|
||||
Err(_) => return Ok(LoopEvent::Disconnected),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Ok(self.wait_for_event())
|
||||
}
|
||||
}
|
||||
|
||||
/// Blocks until the next event is ready: either an incoming engine message or a persistence
|
||||
/// completion (if one is in progress).
|
||||
///
|
||||
@@ -1341,8 +1547,7 @@ where
|
||||
/// Tries to poll for a completed persistence task (non-blocking).
|
||||
///
|
||||
/// Returns `true` if a persistence task was completed, `false` otherwise.
|
||||
#[cfg(test)]
|
||||
pub fn try_poll_persistence(&mut self) -> Result<bool, AdvancePersistenceError> {
|
||||
fn try_poll_persistence_completion(&mut self) -> Result<bool, AdvancePersistenceError> {
|
||||
let Some((rx, start_time, action)) = self.persistence_state.rx.take() else {
|
||||
return Ok(false);
|
||||
};
|
||||
@@ -1363,6 +1568,11 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn try_poll_persistence(&mut self) -> Result<bool, AdvancePersistenceError> {
|
||||
self.try_poll_persistence_completion()
|
||||
}
|
||||
|
||||
/// Handles a completed persistence task.
|
||||
fn on_persistence_complete(
|
||||
&mut self,
|
||||
@@ -1479,165 +1689,7 @@ where
|
||||
));
|
||||
}
|
||||
EngineApiRequest::Beacon(request) => {
|
||||
match request {
|
||||
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
|
||||
let has_attrs = payload_attrs.is_some();
|
||||
|
||||
let start = Instant::now();
|
||||
let mut output = self.on_forkchoice_updated(state, payload_attrs);
|
||||
|
||||
if let Ok(res) = &mut output {
|
||||
// track last received forkchoice state
|
||||
self.state
|
||||
.forkchoice_state_tracker
|
||||
.set_latest(state, res.outcome.forkchoice_status());
|
||||
|
||||
// emit an event about the handled FCU
|
||||
self.emit_event(ConsensusEngineEvent::ForkchoiceUpdated(
|
||||
state,
|
||||
res.outcome.forkchoice_status(),
|
||||
));
|
||||
|
||||
// handle the event if any
|
||||
self.on_maybe_tree_event(res.event.take())?;
|
||||
}
|
||||
|
||||
if let Err(ref err) = output {
|
||||
error!(target: "engine::tree", %err, ?state, "Error processing forkchoice update");
|
||||
}
|
||||
|
||||
self.metrics.engine.forkchoice_updated.update_response_metrics(
|
||||
start,
|
||||
&mut self.metrics.engine.new_payload.latest_finish_at,
|
||||
has_attrs,
|
||||
&output,
|
||||
);
|
||||
|
||||
if let Err(err) =
|
||||
tx.send(output.map(|o| o.outcome).map_err(Into::into))
|
||||
{
|
||||
self.metrics
|
||||
.engine
|
||||
.failed_forkchoice_updated_response_deliveries
|
||||
.increment(1);
|
||||
warn!(target: "engine::tree", ?state, elapsed=?start.elapsed(), "Failed to deliver forkchoiceUpdated response, receiver dropped (request cancelled): {err:?}");
|
||||
}
|
||||
}
|
||||
BeaconEngineMessage::NewPayload { payload, tx } => {
|
||||
let start = Instant::now();
|
||||
let gas_used = payload.gas_used();
|
||||
let num_hash = payload.num_hash();
|
||||
let mut output = self.on_new_payload(payload);
|
||||
self.metrics.engine.new_payload.update_response_metrics(
|
||||
start,
|
||||
&mut self.metrics.engine.forkchoice_updated.latest_finish_at,
|
||||
&output,
|
||||
gas_used,
|
||||
);
|
||||
|
||||
let maybe_event =
|
||||
output.as_mut().ok().and_then(|out| out.event.take());
|
||||
|
||||
// emit response
|
||||
if let Err(err) =
|
||||
tx.send(output.map(|o| o.outcome).map_err(|e| {
|
||||
BeaconOnNewPayloadError::Internal(Box::new(e))
|
||||
}))
|
||||
{
|
||||
warn!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to deliver newPayload response, receiver dropped (request cancelled): {err:?}");
|
||||
self.metrics
|
||||
.engine
|
||||
.failed_new_payload_response_deliveries
|
||||
.increment(1);
|
||||
}
|
||||
|
||||
// handle the event if any
|
||||
self.on_maybe_tree_event(maybe_event)?;
|
||||
}
|
||||
BeaconEngineMessage::RethNewPayload {
|
||||
payload,
|
||||
wait_for_persistence,
|
||||
wait_for_caches,
|
||||
tx,
|
||||
} => {
|
||||
debug!(
|
||||
target: "engine::tree",
|
||||
wait_for_persistence,
|
||||
wait_for_caches,
|
||||
"Processing reth_newPayload"
|
||||
);
|
||||
|
||||
let persistence_wait = if wait_for_persistence {
|
||||
let pending_persistence = self.persistence_state.rx.take();
|
||||
if let Some((rx, start_time, _action)) = pending_persistence {
|
||||
let (persistence_tx, persistence_rx) =
|
||||
std::sync::mpsc::channel();
|
||||
self.runtime.spawn_blocking_named(
|
||||
"wait-persist",
|
||||
move || {
|
||||
let start = Instant::now();
|
||||
let result = rx
|
||||
.recv()
|
||||
.expect("persistence state channel closed");
|
||||
let _ = persistence_tx.send((
|
||||
result,
|
||||
start_time,
|
||||
start.elapsed(),
|
||||
));
|
||||
},
|
||||
);
|
||||
let (result, start_time, wait_duration) = persistence_rx
|
||||
.recv()
|
||||
.expect("persistence result channel closed");
|
||||
let _ = self.on_persistence_complete(result, start_time);
|
||||
Some(wait_duration)
|
||||
} else {
|
||||
Some(Duration::ZERO)
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let cache_wait = wait_for_caches
|
||||
.then(|| self.payload_validator.wait_for_caches());
|
||||
|
||||
let start = Instant::now();
|
||||
let gas_used = payload.gas_used();
|
||||
let num_hash = payload.num_hash();
|
||||
let mut output = self.on_new_payload(payload);
|
||||
let latency = start.elapsed();
|
||||
self.metrics.engine.new_payload.update_response_metrics(
|
||||
start,
|
||||
&mut self.metrics.engine.forkchoice_updated.latest_finish_at,
|
||||
&output,
|
||||
gas_used,
|
||||
);
|
||||
|
||||
let maybe_event =
|
||||
output.as_mut().ok().and_then(|out| out.event.take());
|
||||
|
||||
let timings = NewPayloadTimings {
|
||||
latency,
|
||||
persistence_wait,
|
||||
execution_cache_wait: cache_wait
|
||||
.map(|wait| wait.execution_cache),
|
||||
sparse_trie_wait: cache_wait.map(|wait| wait.sparse_trie),
|
||||
};
|
||||
if let Err(err) =
|
||||
tx.send(output.map(|o| (o.outcome, timings)).map_err(|e| {
|
||||
BeaconOnNewPayloadError::Internal(Box::new(e))
|
||||
}))
|
||||
{
|
||||
error!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
|
||||
self.metrics
|
||||
.engine
|
||||
.failed_new_payload_response_deliveries
|
||||
.increment(1);
|
||||
}
|
||||
|
||||
self.on_maybe_tree_event(maybe_event)?;
|
||||
}
|
||||
}
|
||||
self.stash_beacon_message(request);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1650,6 +1702,144 @@ where
|
||||
Ok(ops::ControlFlow::Continue(()))
|
||||
}
|
||||
|
||||
fn process_beacon_message(
|
||||
&mut self,
|
||||
request: BeaconEngineMessage<T>,
|
||||
backpressure_wait: Option<Duration>,
|
||||
) -> Result<ops::ControlFlow<()>, InsertBlockFatalError> {
|
||||
match request {
|
||||
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
|
||||
let has_attrs = payload_attrs.is_some();
|
||||
|
||||
let start = Instant::now();
|
||||
let mut output = self.on_forkchoice_updated(state, payload_attrs);
|
||||
|
||||
if let Ok(res) = &mut output {
|
||||
self.state
|
||||
.forkchoice_state_tracker
|
||||
.set_latest(state, res.outcome.forkchoice_status());
|
||||
|
||||
self.emit_event(ConsensusEngineEvent::ForkchoiceUpdated(
|
||||
state,
|
||||
res.outcome.forkchoice_status(),
|
||||
));
|
||||
|
||||
self.on_maybe_tree_event(res.event.take())?;
|
||||
}
|
||||
|
||||
if let Err(ref err) = output {
|
||||
error!(target: "engine::tree", %err, ?state, "Error processing forkchoice update");
|
||||
}
|
||||
|
||||
self.metrics.engine.forkchoice_updated.update_response_metrics(
|
||||
start,
|
||||
&mut self.metrics.engine.new_payload.latest_finish_at,
|
||||
has_attrs,
|
||||
&output,
|
||||
);
|
||||
|
||||
if let Err(err) = tx.send(output.map(|o| o.outcome).map_err(Into::into)) {
|
||||
self.metrics.engine.failed_forkchoice_updated_response_deliveries.increment(1);
|
||||
warn!(target: "engine::tree", ?state, elapsed=?start.elapsed(), "Failed to deliver forkchoiceUpdated response, receiver dropped (request cancelled): {err:?}");
|
||||
}
|
||||
}
|
||||
BeaconEngineMessage::NewPayload { payload, tx } => {
|
||||
let start = Instant::now();
|
||||
let gas_used = payload.gas_used();
|
||||
let num_hash = payload.num_hash();
|
||||
let mut output = self.on_new_payload(payload);
|
||||
self.metrics.engine.new_payload.update_response_metrics(
|
||||
start,
|
||||
&mut self.metrics.engine.forkchoice_updated.latest_finish_at,
|
||||
&output,
|
||||
gas_used,
|
||||
);
|
||||
|
||||
let maybe_event = output.as_mut().ok().and_then(|out| out.event.take());
|
||||
|
||||
if let Err(err) = tx.send(
|
||||
output
|
||||
.map(|o| o.outcome)
|
||||
.map_err(|e| BeaconOnNewPayloadError::Internal(Box::new(e))),
|
||||
) {
|
||||
warn!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to deliver newPayload response, receiver dropped (request cancelled): {err:?}");
|
||||
self.metrics.engine.failed_new_payload_response_deliveries.increment(1);
|
||||
}
|
||||
|
||||
self.on_maybe_tree_event(maybe_event)?;
|
||||
}
|
||||
BeaconEngineMessage::RethNewPayload {
|
||||
payload,
|
||||
wait_for_persistence,
|
||||
wait_for_caches,
|
||||
tx,
|
||||
} => {
|
||||
debug!(
|
||||
target: "engine::tree",
|
||||
wait_for_persistence,
|
||||
wait_for_caches,
|
||||
"Processing reth_newPayload"
|
||||
);
|
||||
|
||||
let persistence_wait = if wait_for_persistence {
|
||||
let pending_persistence = self.persistence_state.rx.take();
|
||||
if let Some((rx, start_time, _action)) = pending_persistence {
|
||||
let (persistence_tx, persistence_rx) = std::sync::mpsc::channel();
|
||||
self.runtime.spawn_blocking_named("wait-persist", move || {
|
||||
let start = Instant::now();
|
||||
let result = rx.recv().expect("persistence state channel closed");
|
||||
let _ = persistence_tx.send((result, start_time, start.elapsed()));
|
||||
});
|
||||
let (result, start_time, wait_duration) =
|
||||
persistence_rx.recv().expect("persistence result channel closed");
|
||||
let _ = self.on_persistence_complete(result, start_time);
|
||||
Some(wait_duration)
|
||||
} else {
|
||||
Some(Duration::ZERO)
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let cache_wait = wait_for_caches.then(|| self.payload_validator.wait_for_caches());
|
||||
|
||||
let start = Instant::now();
|
||||
let gas_used = payload.gas_used();
|
||||
let num_hash = payload.num_hash();
|
||||
let mut output = self.on_new_payload(payload);
|
||||
let latency = start.elapsed();
|
||||
self.metrics.engine.new_payload.update_response_metrics(
|
||||
start,
|
||||
&mut self.metrics.engine.forkchoice_updated.latest_finish_at,
|
||||
&output,
|
||||
gas_used,
|
||||
);
|
||||
|
||||
let maybe_event = output.as_mut().ok().and_then(|out| out.event.take());
|
||||
|
||||
let timings = NewPayloadTimings {
|
||||
latency,
|
||||
backpressure_wait,
|
||||
persistence_wait,
|
||||
execution_cache_wait: cache_wait.map(|wait| wait.execution_cache),
|
||||
sparse_trie_wait: cache_wait.map(|wait| wait.sparse_trie),
|
||||
};
|
||||
if let Err(err) = tx.send(
|
||||
output
|
||||
.map(|o| (o.outcome, timings))
|
||||
.map_err(|e| BeaconOnNewPayloadError::Internal(Box::new(e))),
|
||||
) {
|
||||
error!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
|
||||
self.metrics.engine.failed_new_payload_response_deliveries.increment(1);
|
||||
}
|
||||
|
||||
self.on_maybe_tree_event(maybe_event)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(ops::ControlFlow::Continue(()))
|
||||
}
|
||||
|
||||
/// Invoked if the backfill sync has finished to target.
|
||||
///
|
||||
/// At this point we consider the block synced to the backfill target.
|
||||
|
||||
@@ -12,7 +12,7 @@ use reth_trie_db::ChangesetCache;
|
||||
use alloy_eips::eip1898::BlockWithParent;
|
||||
use alloy_primitives::{
|
||||
map::{B256Map, B256Set},
|
||||
Bytes, B256,
|
||||
Address, Bytes, B256,
|
||||
};
|
||||
use alloy_rlp::Decodable;
|
||||
use alloy_rpc_types_engine::{
|
||||
@@ -286,6 +286,11 @@ impl TestHarness {
|
||||
self
|
||||
}
|
||||
|
||||
fn process_next_stashed_message(&mut self) {
|
||||
let processed = self.tree.try_process_stashed_engine_message().unwrap();
|
||||
assert!(processed.is_some(), "expected a stashed message to be processed");
|
||||
}
|
||||
|
||||
async fn fcu_to(&mut self, block_hash: B256, fcu_status: impl Into<ForkchoiceStatus>) {
|
||||
let fcu_status = fcu_status.into();
|
||||
|
||||
@@ -309,6 +314,7 @@ impl TestHarness {
|
||||
.into(),
|
||||
))
|
||||
.unwrap();
|
||||
self.process_next_stashed_message();
|
||||
|
||||
let response = rx.await.unwrap().unwrap().await.unwrap();
|
||||
match fcu_status.into() {
|
||||
@@ -608,6 +614,7 @@ async fn test_engine_request_during_backfill() {
|
||||
.into(),
|
||||
))
|
||||
.unwrap();
|
||||
test_harness.process_next_stashed_message();
|
||||
|
||||
let resp = rx.await.unwrap().unwrap().await.unwrap();
|
||||
assert!(resp.payload_status.is_syncing());
|
||||
@@ -686,11 +693,227 @@ async fn test_holesky_payload() {
|
||||
.into(),
|
||||
))
|
||||
.unwrap();
|
||||
test_harness.process_next_stashed_message();
|
||||
|
||||
let resp = rx.await.unwrap().unwrap();
|
||||
assert!(resp.is_syncing());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_stashed_beacon_message_processes_below_backpressure_threshold() {
|
||||
let mut test_harness = TestHarness::new(MAINNET.clone());
|
||||
test_harness.tree.config = test_harness
|
||||
.tree
|
||||
.config
|
||||
.with_persistence_threshold(0)
|
||||
.with_persistence_backpressure_threshold(1);
|
||||
|
||||
let (tx, mut rx) = oneshot::channel();
|
||||
let _ = test_harness
|
||||
.tree
|
||||
.on_engine_message(FromEngine::Request(
|
||||
BeaconEngineMessage::ForkchoiceUpdated {
|
||||
state: ForkchoiceState {
|
||||
head_block_hash: B256::random(),
|
||||
safe_block_hash: B256::random(),
|
||||
finalized_block_hash: B256::random(),
|
||||
},
|
||||
payload_attrs: None,
|
||||
tx,
|
||||
}
|
||||
.into(),
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(test_harness.tree.stashed_engine_messages.len(), 1);
|
||||
assert!(!test_harness.tree.should_backpressure());
|
||||
|
||||
test_harness.process_next_stashed_message();
|
||||
assert!(rx.try_recv().is_ok(), "expected stashed response after processing");
|
||||
assert!(test_harness.tree.stashed_engine_messages.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_stashed_beacon_message_stays_stashed_while_backpressured() {
|
||||
let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(1..4).collect();
|
||||
let mut test_harness = TestHarness::new(MAINNET.clone()).with_blocks(blocks.clone());
|
||||
test_harness.tree.config = test_harness
|
||||
.tree
|
||||
.config
|
||||
.with_persistence_threshold(0)
|
||||
.with_persistence_backpressure_threshold(1);
|
||||
|
||||
let (_persist_tx, persist_rx) = crossbeam_channel::bounded(1);
|
||||
test_harness
|
||||
.tree
|
||||
.persistence_state
|
||||
.start_save(blocks.last().unwrap().recovered_block().num_hash(), persist_rx);
|
||||
|
||||
let (tx, _rx) = oneshot::channel();
|
||||
let _ = test_harness
|
||||
.tree
|
||||
.on_engine_message(FromEngine::Request(
|
||||
BeaconEngineMessage::ForkchoiceUpdated {
|
||||
state: ForkchoiceState {
|
||||
head_block_hash: B256::random(),
|
||||
safe_block_hash: B256::random(),
|
||||
finalized_block_hash: B256::random(),
|
||||
},
|
||||
payload_attrs: None,
|
||||
tx,
|
||||
}
|
||||
.into(),
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
assert!(test_harness.tree.should_backpressure());
|
||||
assert!(test_harness.tree.try_process_stashed_engine_message().unwrap().is_none());
|
||||
assert_eq!(test_harness.tree.stashed_engine_messages.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_backpressure_waits_for_persistence_before_reading_incoming() {
|
||||
let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(1..4).collect();
|
||||
let mut test_harness = TestHarness::new(MAINNET.clone()).with_blocks(blocks.clone());
|
||||
test_harness.tree.config = test_harness
|
||||
.tree
|
||||
.config
|
||||
.with_persistence_threshold(0)
|
||||
.with_persistence_backpressure_threshold(1);
|
||||
|
||||
let (persist_tx, persist_rx) = crossbeam_channel::bounded(1);
|
||||
let persisted = blocks.last().unwrap().recovered_block().num_hash();
|
||||
test_harness.tree.persistence_state.start_save(persisted, persist_rx);
|
||||
|
||||
let (tx, _rx) = oneshot::channel();
|
||||
let _ = test_harness
|
||||
.tree
|
||||
.on_engine_message(FromEngine::Request(
|
||||
BeaconEngineMessage::ForkchoiceUpdated {
|
||||
state: ForkchoiceState {
|
||||
head_block_hash: B256::random(),
|
||||
safe_block_hash: B256::random(),
|
||||
finalized_block_hash: B256::random(),
|
||||
},
|
||||
payload_attrs: None,
|
||||
tx,
|
||||
}
|
||||
.into(),
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
test_harness.to_tree_tx.send(FromEngine::DownloadedBlocks(vec![])).unwrap();
|
||||
|
||||
std::thread::spawn(move || {
|
||||
std::thread::sleep(Duration::from_millis(10));
|
||||
persist_tx
|
||||
.send(PersistenceResult {
|
||||
last_block: Some(persisted),
|
||||
commit_duration: Some(Duration::ZERO),
|
||||
})
|
||||
.unwrap();
|
||||
});
|
||||
|
||||
let event = test_harness.tree.wait_for_persistence_event().unwrap();
|
||||
assert!(matches!(event, super::LoopEvent::PersistenceComplete { .. }));
|
||||
assert_eq!(test_harness.tree.stashed_engine_messages.len(), 2);
|
||||
|
||||
let super::LoopEvent::PersistenceComplete { result, start_time } = event else {
|
||||
unreachable!()
|
||||
};
|
||||
test_harness.tree.on_persistence_complete(result, start_time).unwrap();
|
||||
|
||||
test_harness.process_next_stashed_message();
|
||||
assert!(test_harness.tree.stashed_engine_messages.len() == 1);
|
||||
test_harness.process_next_stashed_message();
|
||||
assert!(test_harness.tree.stashed_engine_messages.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_backpressure_handler_enqueues_attrs_fcu() {
|
||||
let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(1..2).collect();
|
||||
let mut test_harness = TestHarness::new(MAINNET.clone()).with_blocks(blocks);
|
||||
test_harness.tree.config = test_harness
|
||||
.tree
|
||||
.config
|
||||
.clone()
|
||||
.with_persistence_threshold(0)
|
||||
.with_persistence_backpressure_threshold(1)
|
||||
.with_always_process_payload_attributes_on_canonical_head(true);
|
||||
|
||||
let (_persist_tx, persist_rx) = crossbeam_channel::bounded(1);
|
||||
let persisted = test_harness.tree.state.tree_state.current_canonical_head;
|
||||
test_harness.tree.persistence_state.start_save(persisted, persist_rx);
|
||||
|
||||
let head = test_harness.tree.canonical_in_memory_state.get_canonical_head();
|
||||
let (tx, mut rx) = oneshot::channel();
|
||||
test_harness
|
||||
.tree
|
||||
.stash_incoming_message(FromEngine::Request(
|
||||
BeaconEngineMessage::ForkchoiceUpdated {
|
||||
state: ForkchoiceState {
|
||||
head_block_hash: head.hash(),
|
||||
safe_block_hash: B256::ZERO,
|
||||
finalized_block_hash: B256::ZERO,
|
||||
},
|
||||
payload_attrs: Some(alloy_rpc_types_engine::PayloadAttributes {
|
||||
timestamp: head.timestamp().saturating_add(1),
|
||||
prev_randao: B256::ZERO,
|
||||
suggested_fee_recipient: Address::ZERO,
|
||||
withdrawals: None,
|
||||
parent_beacon_block_root: None,
|
||||
}),
|
||||
tx,
|
||||
}
|
||||
.into(),
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(test_harness.tree.stashed_engine_messages.len(), 1);
|
||||
assert!(rx.try_recv().is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_reth_new_payload_reports_backpressure_wait() {
|
||||
let s = include_str!("../../test-data/holesky/2.rlp");
|
||||
let data = Bytes::from_str(s).unwrap();
|
||||
let block = Block::decode(&mut data.as_ref()).unwrap();
|
||||
let sealed = block.seal_slow();
|
||||
let hash = sealed.hash();
|
||||
let block = sealed.into_block();
|
||||
let payload = ExecutionPayloadV1::from_block_unchecked(hash, &block);
|
||||
|
||||
let mut test_harness = TestHarness::new(HOLESKY.clone());
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let _ = test_harness
|
||||
.tree
|
||||
.on_engine_message(FromEngine::Request(
|
||||
BeaconEngineMessage::RethNewPayload {
|
||||
payload: ExecutionData {
|
||||
payload: payload.into(),
|
||||
sidecar: ExecutionPayloadSidecar::none(),
|
||||
},
|
||||
wait_for_persistence: false,
|
||||
wait_for_caches: false,
|
||||
tx,
|
||||
}
|
||||
.into(),
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
// `backpressure_wait` is measured as `stashed_at.elapsed()` when the queued
|
||||
// message is drained. Without a small delay here, stashing and draining can
|
||||
// happen in the same clock tick and legitimately report `Duration::ZERO`,
|
||||
// which would make the `> Duration::ZERO` assertion below flaky.
|
||||
std::thread::sleep(Duration::from_millis(10));
|
||||
test_harness.process_next_stashed_message();
|
||||
|
||||
let (_, timings) = rx.await.unwrap().unwrap();
|
||||
assert!(timings.backpressure_wait.is_some());
|
||||
assert!(timings.backpressure_wait.unwrap() > Duration::ZERO);
|
||||
assert_eq!(timings.persistence_wait, None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_tree_state_on_new_head_reorg() {
|
||||
reth_tracing::init_test_tracing();
|
||||
@@ -1093,6 +1316,7 @@ async fn test_fcu_with_canonical_ancestor_updates_latest_block() {
|
||||
.into(),
|
||||
))
|
||||
.unwrap();
|
||||
test_harness.process_next_stashed_message();
|
||||
|
||||
// Verify FCU succeeds
|
||||
let response = rx.await.unwrap().unwrap().await.unwrap();
|
||||
|
||||
@@ -13,6 +13,11 @@ pub struct RethPayloadStatus {
|
||||
pub status: PayloadStatus,
|
||||
/// Server-side execution latency in microseconds.
|
||||
pub latency_us: u64,
|
||||
/// Time spent waiting in the backpressure queue before processing, in microseconds.
|
||||
///
|
||||
/// `None` when the message was not queued.
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub backpressure_wait_us: Option<u64>,
|
||||
/// Time spent waiting for persistence to complete, in microseconds.
|
||||
///
|
||||
/// `None` when wasn't asked to wait.
|
||||
|
||||
@@ -55,6 +55,7 @@ impl<Payload: PayloadTypes> RethEngineApiServer<Payload::ExecutionData> for Reth
|
||||
Ok(RethPayloadStatus {
|
||||
status,
|
||||
latency_us: timings.latency.as_micros() as u64,
|
||||
backpressure_wait_us: timings.backpressure_wait.map(|d| d.as_micros() as u64),
|
||||
persistence_wait_us: timings.persistence_wait.map(|d| d.as_micros() as u64),
|
||||
execution_cache_wait_us: timings.execution_cache_wait.map(|d| d.as_micros() as u64),
|
||||
sparse_trie_wait_us: timings.sparse_trie_wait.map(|d| d.as_micros() as u64),
|
||||
|
||||
Reference in New Issue
Block a user