From 881500e5920bfe02c0ca97166ec255a7ddc181bc Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Mon, 16 Feb 2026 11:11:13 +0000 Subject: [PATCH] feat(rpc, reth-bench): `reth_newPayload` methods for reth-bench (#22133) Co-authored-by: Amp Co-authored-by: Georgios Konstantopoulos --- Cargo.lock | 1 + bin/reth-bench/src/bench/context.rs | 12 +- bin/reth-bench/src/bench/gas_limit_ramp.rs | 26 +++- bin/reth-bench/src/bench/new_payload_fcu.rs | 43 +++++- bin/reth-bench/src/bench/new_payload_only.rs | 31 ++++- bin/reth-bench/src/bench/output.rs | 28 +++- bin/reth-bench/src/bench/replay_payloads.rs | 87 +++++++++--- bin/reth-bench/src/valid_payload.rs | 130 +++++++++++++++--- crates/engine/primitives/src/message.rs | 47 +++++++ crates/engine/tree/src/launch.rs | 4 +- crates/engine/tree/src/tree/mod.rs | 79 ++++++++++- .../tree/src/tree/payload_processor/mod.rs | 82 +++++++++++ .../preserved_sparse_trie.rs | 23 +++- .../engine/tree/src/tree/payload_validator.rs | 13 +- crates/engine/util/src/engine_store.rs | 3 +- crates/node/builder/src/rpc.rs | 7 +- crates/node/core/src/args/benchmark_args.rs | 8 ++ crates/rpc/rpc-api/Cargo.toml | 1 + crates/rpc/rpc-api/src/lib.rs | 3 + crates/rpc/rpc-api/src/reth_engine.rs | 39 ++++++ crates/rpc/rpc-engine-api/src/engine_api.rs | 62 ++++++++- 21 files changed, 655 insertions(+), 74 deletions(-) create mode 100644 crates/rpc/rpc-api/src/reth_engine.rs diff --git a/Cargo.lock b/Cargo.lock index 8614146260..7ec0ac0a12 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9844,6 +9844,7 @@ dependencies = [ "reth-network-peers", "reth-rpc-eth-api", "reth-trie-common", + "serde", "serde_json", "tokio", ] diff --git a/bin/reth-bench/src/bench/context.rs b/bin/reth-bench/src/bench/context.rs index 727ad8a914..112e6ec3a5 100644 --- a/bin/reth-bench/src/bench/context.rs +++ b/bin/reth-bench/src/bench/context.rs @@ -29,6 +29,8 @@ pub(crate) struct BenchContext { pub(crate) next_block: u64, /// Whether the chain is an OP rollup. pub(crate) is_optimism: bool, + /// Whether to use `reth_newPayload` endpoint instead of `engine_newPayload*`. + pub(crate) use_reth_namespace: bool, } impl BenchContext { @@ -140,6 +142,14 @@ impl BenchContext { }; let next_block = first_block.header.number + 1; - Ok(Self { auth_provider, block_provider, benchmark_mode, next_block, is_optimism }) + let use_reth_namespace = bench_args.reth_new_payload; + Ok(Self { + auth_provider, + block_provider, + benchmark_mode, + next_block, + is_optimism, + use_reth_namespace, + }) } } diff --git a/bin/reth-bench/src/bench/gas_limit_ramp.rs b/bin/reth-bench/src/bench/gas_limit_ramp.rs index d960e9cf68..7043284fdb 100644 --- a/bin/reth-bench/src/bench/gas_limit_ramp.rs +++ b/bin/reth-bench/src/bench/gas_limit_ramp.rs @@ -6,7 +6,7 @@ use crate::{ helpers::{build_payload, parse_gas_limit, prepare_payload_request, rpc_block_to_header}, output::GasRampPayloadFile, }, - valid_payload::{call_forkchoice_updated, call_new_payload, payload_to_new_payload}, + valid_payload::{call_forkchoice_updated, call_new_payload_with_reth, payload_to_new_payload}, }; use alloy_eips::BlockNumberOrTag; use alloy_provider::{network::AnyNetwork, Provider, RootProvider}; @@ -47,6 +47,14 @@ pub struct Command { /// Output directory for benchmark results and generated payloads. #[arg(long, value_name = "OUTPUT")] output: PathBuf, + + /// Use `reth_newPayload` endpoint instead of `engine_newPayload*`. + /// + /// The `reth_newPayload` endpoint is a reth-specific extension that takes `ExecutionData` + /// directly, waits for persistence and cache updates to complete before processing, + /// and returns server-side timing breakdowns (latency, persistence wait, cache wait). + #[arg(long, default_value = "false", verbatim_doc_comment)] + reth_new_payload: bool, } /// Mode for determining when to stop ramping. @@ -138,6 +146,9 @@ impl Command { ); } } + if self.reth_new_payload { + info!("Using reth_newPayload endpoint"); + } let mut blocks_processed = 0u64; let total_benchmark_duration = Instant::now(); @@ -163,7 +174,7 @@ impl Command { // Regenerate the payload from the modified block, but keep the original sidecar // which contains the actual execution requests data (not just the hash) let (payload, _) = ExecutionPayload::from_block_unchecked(block_hash, &block); - let (version, params) = payload_to_new_payload( + let (version, params, execution_data) = payload_to_new_payload( payload, sidecar, false, @@ -174,13 +185,18 @@ impl Command { // Save payload to file with version info for replay let payload_path = self.output.join(format!("payload_block_{}.json", block.header.number)); - let file = - GasRampPayloadFile { version: version as u8, block_hash, params: params.clone() }; + let file = GasRampPayloadFile { + version: version as u8, + block_hash, + params: params.clone(), + execution_data: Some(execution_data.clone()), + }; let payload_json = serde_json::to_string_pretty(&file)?; std::fs::write(&payload_path, &payload_json)?; info!(target: "reth-bench", block_number = block.header.number, path = %payload_path.display(), "Saved payload"); - call_new_payload(&provider, version, params).await?; + let reth_data = self.reth_new_payload.then_some(execution_data); + let _ = call_new_payload_with_reth(&provider, version, params, reth_data).await?; let forkchoice_state = ForkchoiceState { head_block_hash: block_hash, diff --git a/bin/reth-bench/src/bench/new_payload_fcu.rs b/bin/reth-bench/src/bench/new_payload_fcu.rs index 202fe12874..13dc5bcb19 100644 --- a/bin/reth-bench/src/bench/new_payload_fcu.rs +++ b/bin/reth-bench/src/bench/new_payload_fcu.rs @@ -20,7 +20,7 @@ use crate::{ derive_ws_rpc_url, setup_persistence_subscription, PersistenceWaiter, }, }, - valid_payload::{block_to_new_payload, call_forkchoice_updated, call_new_payload}, + valid_payload::{block_to_new_payload, call_forkchoice_updated, call_new_payload_with_reth}, }; use alloy_provider::Provider; use alloy_rpc_types_engine::ForkchoiceState; @@ -150,10 +150,15 @@ impl Command { auth_provider, mut next_block, is_optimism, - .. + use_reth_namespace, } = BenchContext::new(&self.benchmark, self.rpc_url).await?; let total_blocks = benchmark_mode.total_blocks(); + + if use_reth_namespace { + info!("Using reth_newPayload endpoint"); + } + let buffer_size = self.rpc_block_buffer_size; // Use a oneshot channel to propagate errors from the spawned task @@ -230,16 +235,40 @@ impl Command { finalized_block_hash: finalized, }; - let (version, params) = block_to_new_payload(block, is_optimism)?; + let (version, params, execution_data) = block_to_new_payload(block, is_optimism)?; let start = Instant::now(); - call_new_payload(&auth_provider, version, params).await?; + let reth_data = use_reth_namespace.then_some(execution_data); + let server_timings = + call_new_payload_with_reth(&auth_provider, version, params, reth_data).await?; - let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() }; + let np_latency = + server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed()); + let new_payload_result = NewPayloadResult { + gas_used, + latency: np_latency, + persistence_wait: server_timings.as_ref().and_then(|t| t.persistence_wait), + execution_cache_wait: server_timings + .as_ref() + .map(|t| t.execution_cache_wait) + .unwrap_or_default(), + sparse_trie_wait: server_timings + .as_ref() + .map(|t| t.sparse_trie_wait) + .unwrap_or_default(), + }; + let fcu_start = Instant::now(); call_forkchoice_updated(&auth_provider, version, forkchoice_state, None).await?; + let fcu_latency = fcu_start.elapsed(); - let total_latency = start.elapsed(); - let fcu_latency = total_latency - new_payload_result.latency; + let total_latency = if server_timings.is_some() { + // When using server-side latency for newPayload, derive total from the + // independently measured components to avoid mixing server-side and + // client-side (network-inclusive) timings. + np_latency + fcu_latency + } else { + start.elapsed() + }; let combined_result = CombinedResult { block_number, gas_limit, diff --git a/bin/reth-bench/src/bench/new_payload_only.rs b/bin/reth-bench/src/bench/new_payload_only.rs index fd5cf86281..d9d3b9d9d2 100644 --- a/bin/reth-bench/src/bench/new_payload_only.rs +++ b/bin/reth-bench/src/bench/new_payload_only.rs @@ -8,7 +8,7 @@ use crate::{ NEW_PAYLOAD_OUTPUT_SUFFIX, }, }, - valid_payload::{block_to_new_payload, call_new_payload}, + valid_payload::{block_to_new_payload, call_new_payload_with_reth}, }; use alloy_provider::Provider; use clap::Parser; @@ -49,10 +49,15 @@ impl Command { auth_provider, mut next_block, is_optimism, - .. + use_reth_namespace, } = BenchContext::new(&self.benchmark, self.rpc_url).await?; let total_blocks = benchmark_mode.total_blocks(); + + if use_reth_namespace { + info!("Using reth_newPayload endpoint"); + } + let buffer_size = self.rpc_block_buffer_size; // Use a oneshot channel to propagate errors from the spawned task @@ -100,12 +105,28 @@ impl Command { debug!(target: "reth-bench", number=?block.header.number, "Sending payload to engine"); - let (version, params) = block_to_new_payload(block, is_optimism)?; + let (version, params, execution_data) = block_to_new_payload(block, is_optimism)?; let start = Instant::now(); - call_new_payload(&auth_provider, version, params).await?; + let reth_data = use_reth_namespace.then_some(execution_data); + let server_timings = + call_new_payload_with_reth(&auth_provider, version, params, reth_data).await?; - let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() }; + let latency = + server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed()); + let new_payload_result = NewPayloadResult { + gas_used, + latency, + persistence_wait: server_timings.as_ref().and_then(|t| t.persistence_wait), + execution_cache_wait: server_timings + .as_ref() + .map(|t| t.execution_cache_wait) + .unwrap_or_default(), + sparse_trie_wait: server_timings + .as_ref() + .map(|t| t.sparse_trie_wait) + .unwrap_or_default(), + }; blocks_processed += 1; let progress = match total_blocks { Some(total) => format!("{blocks_processed}/{total}"), diff --git a/bin/reth-bench/src/bench/output.rs b/bin/reth-bench/src/bench/output.rs index 4ccfc96235..e2d5d086a3 100644 --- a/bin/reth-bench/src/bench/output.rs +++ b/bin/reth-bench/src/bench/output.rs @@ -27,6 +27,9 @@ pub(crate) struct GasRampPayloadFile { pub(crate) block_hash: B256, /// The params to pass to newPayload. pub(crate) params: serde_json::Value, + /// The execution data for `reth_newPayload`. + #[serde(skip_serializing_if = "Option::is_none", default)] + pub(crate) execution_data: Option, } /// This represents the results of a single `newPayload` call in the benchmark, containing the gas @@ -37,6 +40,12 @@ pub(crate) struct NewPayloadResult { pub(crate) gas_used: u64, /// The latency of the `newPayload` call. pub(crate) latency: Duration, + /// Time spent waiting for persistence. `None` when no persistence was in-flight. + pub(crate) persistence_wait: Option, + /// Time spent waiting for execution cache lock. + pub(crate) execution_cache_wait: Duration, + /// Time spent waiting for sparse trie lock. + pub(crate) sparse_trie_wait: Duration, } impl NewPayloadResult { @@ -67,9 +76,12 @@ impl Serialize for NewPayloadResult { { // convert the time to microseconds let time = self.latency.as_micros(); - let mut state = serializer.serialize_struct("NewPayloadResult", 2)?; + let mut state = serializer.serialize_struct("NewPayloadResult", 5)?; state.serialize_field("gas_used", &self.gas_used)?; state.serialize_field("latency", &time)?; + state.serialize_field("persistence_wait", &self.persistence_wait.map(|d| d.as_micros()))?; + state.serialize_field("execution_cache_wait", &self.execution_cache_wait.as_micros())?; + state.serialize_field("sparse_trie_wait", &self.sparse_trie_wait.as_micros())?; state.end() } } @@ -126,7 +138,7 @@ impl Serialize for CombinedResult { let fcu_latency = self.fcu_latency.as_micros(); let new_payload_latency = self.new_payload_result.latency.as_micros(); let total_latency = self.total_latency.as_micros(); - let mut state = serializer.serialize_struct("CombinedResult", 7)?; + let mut state = serializer.serialize_struct("CombinedResult", 10)?; // flatten the new payload result because this is meant for CSV writing state.serialize_field("block_number", &self.block_number)?; @@ -136,6 +148,18 @@ impl Serialize for CombinedResult { state.serialize_field("new_payload_latency", &new_payload_latency)?; state.serialize_field("fcu_latency", &fcu_latency)?; state.serialize_field("total_latency", &total_latency)?; + state.serialize_field( + "persistence_wait", + &self.new_payload_result.persistence_wait.map(|d| d.as_micros()), + )?; + state.serialize_field( + "execution_cache_wait", + &self.new_payload_result.execution_cache_wait.as_micros(), + )?; + state.serialize_field( + "sparse_trie_wait", + &self.new_payload_result.sparse_trie_wait.as_micros(), + )?; state.end() } } diff --git a/bin/reth-bench/src/bench/replay_payloads.rs b/bin/reth-bench/src/bench/replay_payloads.rs index df55bef498..25d0d307f4 100644 --- a/bin/reth-bench/src/bench/replay_payloads.rs +++ b/bin/reth-bench/src/bench/replay_payloads.rs @@ -23,12 +23,15 @@ use crate::{ derive_ws_rpc_url, setup_persistence_subscription, PersistenceWaiter, }, }, - valid_payload::{call_forkchoice_updated, call_new_payload}, + valid_payload::{call_forkchoice_updated, call_new_payload_with_reth}, }; use alloy_primitives::B256; use alloy_provider::{ext::EngineApi, network::AnyNetwork, Provider, RootProvider}; use alloy_rpc_client::ClientBuilder; -use alloy_rpc_types_engine::{ExecutionPayloadEnvelopeV4, ForkchoiceState, JwtSecret}; +use alloy_rpc_types_engine::{ + CancunPayloadFields, ExecutionData, ExecutionPayloadEnvelopeV4, ExecutionPayloadSidecar, + ForkchoiceState, JwtSecret, PraguePayloadFields, +}; use clap::Parser; use eyre::Context; use reth_cli_runner::CliContext; @@ -124,6 +127,14 @@ pub struct Command { /// If not provided, derives from engine RPC URL by changing scheme to ws and port to 8546. #[arg(long, value_name = "WS_RPC_URL", verbatim_doc_comment)] ws_rpc_url: Option, + + /// Use `reth_newPayload` endpoint instead of `engine_newPayload*`. + /// + /// The `reth_newPayload` endpoint is a reth-specific extension that takes `ExecutionData` + /// directly, waits for persistence and cache updates to complete before processing, + /// and returns server-side timing breakdowns (latency, persistence wait, cache wait). + #[arg(long, default_value = "false", verbatim_doc_comment)] + reth_new_payload: bool, } /// A loaded payload ready for execution. @@ -163,6 +174,9 @@ impl Command { self.persistence_threshold ); } + if self.reth_new_payload { + info!("Using reth_newPayload endpoint"); + } // Set up waiter based on configured options // When both are set: wait at least wait_time, and also wait for persistence if needed @@ -248,7 +262,15 @@ impl Command { "Executing gas ramp payload (newPayload + FCU)" ); - call_new_payload(&auth_provider, payload.version, payload.file.params.clone()).await?; + let reth_data = + if self.reth_new_payload { payload.file.execution_data.clone() } else { None }; + let _ = call_new_payload_with_reth( + &auth_provider, + payload.version, + payload.file.params.clone(), + reth_data, + ) + .await?; let fcu_state = ForkchoiceState { head_block_hash: payload.file.block_hash, @@ -303,20 +325,47 @@ impl Command { "Sending newPayload" ); - let status = auth_provider - .new_payload_v4( - execution_payload.clone(), - vec![], - B256::ZERO, - envelope.execution_requests.to_vec(), - ) - .await?; + let params = serde_json::to_value(( + execution_payload.clone(), + Vec::::new(), + B256::ZERO, + envelope.execution_requests.to_vec(), + ))?; - let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() }; + let reth_data = self.reth_new_payload.then(|| ExecutionData { + payload: execution_payload.clone().into(), + sidecar: ExecutionPayloadSidecar::v4( + CancunPayloadFields { + versioned_hashes: Vec::new(), + parent_beacon_block_root: B256::ZERO, + }, + PraguePayloadFields { requests: envelope.execution_requests.clone().into() }, + ), + }); - if !status.is_valid() { - return Err(eyre::eyre!("Payload rejected: {:?}", status)); - } + let server_timings = call_new_payload_with_reth( + &auth_provider, + EngineApiMessageVersion::V4, + params, + reth_data, + ) + .await?; + + let np_latency = + server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed()); + let new_payload_result = NewPayloadResult { + gas_used, + latency: np_latency, + persistence_wait: server_timings.as_ref().and_then(|t| t.persistence_wait), + execution_cache_wait: server_timings + .as_ref() + .map(|t| t.execution_cache_wait) + .unwrap_or_default(), + sparse_trie_wait: server_timings + .as_ref() + .map(|t| t.sparse_trie_wait) + .unwrap_or_default(), + }; let fcu_state = ForkchoiceState { head_block_hash: block_hash, @@ -326,10 +375,12 @@ impl Command { debug!(target: "reth-bench", method = "engine_forkchoiceUpdatedV3", ?fcu_state, "Sending forkchoiceUpdated"); + let fcu_start = Instant::now(); let fcu_result = auth_provider.fork_choice_updated_v3(fcu_state, None).await?; + let fcu_latency = fcu_start.elapsed(); - let total_latency = start.elapsed(); - let fcu_latency = total_latency - new_payload_result.latency; + let total_latency = + if server_timings.is_some() { np_latency + fcu_latency } else { start.elapsed() }; let combined_result = CombinedResult { block_number, @@ -352,7 +403,7 @@ impl Command { TotalGasRow { block_number, transaction_count, gas_used, time: current_duration }; results.push((gas_row, combined_result)); - debug!(target: "reth-bench", ?status, ?fcu_result, "Payload executed successfully"); + debug!(target: "reth-bench", ?fcu_result, "Payload executed successfully"); parent_hash = block_hash; } diff --git a/bin/reth-bench/src/valid_payload.rs b/bin/reth-bench/src/valid_payload.rs index 013dd65a07..8fc832e2fe 100644 --- a/bin/reth-bench/src/valid_payload.rs +++ b/bin/reth-bench/src/valid_payload.rs @@ -6,12 +6,14 @@ use alloy_eips::eip7685::Requests; use alloy_primitives::B256; use alloy_provider::{ext::EngineApi, network::AnyRpcBlock, Network, Provider}; use alloy_rpc_types_engine::{ - ExecutionPayload, ExecutionPayloadInputV2, ExecutionPayloadSidecar, ForkchoiceState, - ForkchoiceUpdated, PayloadAttributes, PayloadStatus, + ExecutionData, ExecutionPayload, ExecutionPayloadInputV2, ExecutionPayloadSidecar, + ForkchoiceState, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, }; use alloy_transport::TransportResult; use op_alloy_rpc_types_engine::OpExecutionPayloadV4; use reth_node_api::EngineApiMessageVersion; +use serde::Deserialize; +use std::time::Duration; use tracing::{debug, error}; /// An extension trait for providers that implement the engine API, to wait for a VALID response. @@ -161,10 +163,13 @@ where } } +/// Converts an RPC block into versioned engine API params and an [`ExecutionData`]. +/// +/// Returns `(version, versioned_params, execution_data)`. pub(crate) fn block_to_new_payload( block: AnyRpcBlock, is_optimism: bool, -) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value)> { +) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value, ExecutionData)> { let block = block .into_inner() .map_header(|header| header.map(|h| h.into_header_with_defaults())) @@ -179,13 +184,19 @@ pub(crate) fn block_to_new_payload( payload_to_new_payload(payload, sidecar, is_optimism, block.withdrawals_root, None) } +/// Converts an execution payload and sidecar into versioned engine API params and an +/// [`ExecutionData`]. +/// +/// Returns `(version, versioned_params, execution_data)`. pub(crate) fn payload_to_new_payload( payload: ExecutionPayload, sidecar: ExecutionPayloadSidecar, is_optimism: bool, withdrawals_root: Option, target_version: Option, -) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value)> { +) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value, ExecutionData)> { + let execution_data = ExecutionData { payload: payload.clone(), sidecar: sidecar.clone() }; + let (version, params) = match payload { ExecutionPayload::V3(payload) => { let cancun = sidecar.cancun().unwrap(); @@ -244,7 +255,7 @@ pub(crate) fn payload_to_new_payload( } }; - Ok((version, params)) + Ok((version, params, execution_data)) } /// Calls the correct `engine_newPayload` method depending on the given [`ExecutionPayload`] and its @@ -252,32 +263,109 @@ pub(crate) fn payload_to_new_payload( /// /// # Panics /// If the given payload is a V3 payload, but a parent beacon block root is provided as `None`. +#[allow(dead_code)] pub(crate) async fn call_new_payload>( provider: P, version: EngineApiMessageVersion, params: serde_json::Value, -) -> TransportResult<()> { - let method = version.method_name(); +) -> TransportResult> { + call_new_payload_with_reth(provider, version, params, None).await +} - debug!(target: "reth-bench", method, "Sending newPayload"); +/// Response from `reth_newPayload` endpoint, which includes server-measured latency. +#[derive(Debug, Deserialize)] +struct RethPayloadStatus { + #[serde(flatten)] + status: PayloadStatus, + latency_us: u64, + #[serde(default)] + persistence_wait_us: Option, + #[serde(default)] + execution_cache_wait_us: u64, + #[serde(default)] + sparse_trie_wait_us: u64, +} - let mut status: PayloadStatus = provider.client().request(method, ¶ms).await?; +/// Server-side timing breakdown from `reth_newPayload` endpoint. +#[derive(Debug, Clone, Copy, Default)] +pub(crate) struct NewPayloadTimingBreakdown { + /// Server-side execution latency. + pub(crate) latency: Duration, + /// Time spent waiting for persistence. `None` when no persistence was in-flight. + pub(crate) persistence_wait: Option, + /// Time spent waiting for execution cache lock. + pub(crate) execution_cache_wait: Duration, + /// Time spent waiting for sparse trie lock. + pub(crate) sparse_trie_wait: Duration, +} - while !status.is_valid() { - if status.is_invalid() { - error!(target: "reth-bench", ?status, ?params, "Invalid {method}",); - return Err(alloy_json_rpc::RpcError::LocalUsageError(Box::new(std::io::Error::other( - format!("Invalid {method}: {status:?}"), - )))) +/// Calls either `engine_newPayload*` or `reth_newPayload` depending on whether +/// `reth_execution_data` is provided. +/// +/// When `reth_execution_data` is `Some`, uses the `reth_newPayload` endpoint which takes +/// `ExecutionData` directly and waits for persistence and cache updates to complete. +/// +/// Returns the server-reported timing breakdown when using the reth namespace, or `None` for +/// the standard engine namespace. +pub(crate) async fn call_new_payload_with_reth>( + provider: P, + version: EngineApiMessageVersion, + params: serde_json::Value, + reth_execution_data: Option, +) -> TransportResult> { + if let Some(execution_data) = reth_execution_data { + let method = "reth_newPayload"; + let reth_params = serde_json::to_value((execution_data.clone(),)) + .expect("ExecutionData serialization cannot fail"); + + debug!(target: "reth-bench", method, "Sending newPayload"); + + let mut resp: RethPayloadStatus = provider.client().request(method, &reth_params).await?; + + while !resp.status.is_valid() { + if resp.status.is_invalid() { + error!(target: "reth-bench", status=?resp.status, "Invalid {method}"); + return Err(alloy_json_rpc::RpcError::LocalUsageError(Box::new( + std::io::Error::other(format!("Invalid {method}: {:?}", resp.status)), + ))) + } + if resp.status.is_syncing() { + return Err(alloy_json_rpc::RpcError::UnsupportedFeature( + "invalid range: no canonical state found for parent of requested block", + )) + } + resp = provider.client().request(method, &reth_params).await?; } - if status.is_syncing() { - return Err(alloy_json_rpc::RpcError::UnsupportedFeature( - "invalid range: no canonical state found for parent of requested block", - )) + + Ok(Some(NewPayloadTimingBreakdown { + latency: Duration::from_micros(resp.latency_us), + persistence_wait: resp.persistence_wait_us.map(Duration::from_micros), + execution_cache_wait: Duration::from_micros(resp.execution_cache_wait_us), + sparse_trie_wait: Duration::from_micros(resp.sparse_trie_wait_us), + })) + } else { + let method = version.method_name(); + + debug!(target: "reth-bench", method, "Sending newPayload"); + + let mut status: PayloadStatus = provider.client().request(method, ¶ms).await?; + + while !status.is_valid() { + if status.is_invalid() { + error!(target: "reth-bench", ?status, ?params, "Invalid {method}",); + return Err(alloy_json_rpc::RpcError::LocalUsageError(Box::new( + std::io::Error::other(format!("Invalid {method}: {status:?}")), + ))) + } + if status.is_syncing() { + return Err(alloy_json_rpc::RpcError::UnsupportedFeature( + "invalid range: no canonical state found for parent of requested block", + )) + } + status = provider.client().request(method, ¶ms).await?; } - status = provider.client().request(method, ¶ms).await?; + Ok(None) } - Ok(()) } /// Calls the correct `engine_forkchoiceUpdated` method depending on the given diff --git a/crates/engine/primitives/src/message.rs b/crates/engine/primitives/src/message.rs index 5e7d97c8c0..17b69edabd 100644 --- a/crates/engine/primitives/src/message.rs +++ b/crates/engine/primitives/src/message.rs @@ -15,6 +15,7 @@ use futures::{future::Either, FutureExt, TryFutureExt}; use reth_errors::RethResult; use reth_payload_builder_primitives::PayloadBuilderError; use reth_payload_primitives::{EngineApiMessageVersion, PayloadTypes}; +use std::time::Duration; use tokio::sync::{mpsc::UnboundedSender, oneshot}; /// Type alias for backwards compat @@ -142,6 +143,20 @@ impl Future for PendingPayloadId { } } +/// Timing breakdown for `reth_newPayload` responses. +#[derive(Debug, Clone, Copy)] +pub struct NewPayloadTimings { + /// Server-side execution latency. + pub latency: Duration, + /// Time spent waiting for persistence to complete. + /// `None` when no persistence was in-flight. + pub persistence_wait: Option, + /// Time spent waiting for the execution cache lock. + pub execution_cache_wait: Duration, + /// Time spent waiting for the sparse trie lock. + pub sparse_trie_wait: Duration, +} + /// A message for the beacon engine from other components of the node (engine RPC API invoked by the /// consensus layer). #[derive(Debug)] @@ -153,6 +168,16 @@ pub enum BeaconEngineMessage { /// The sender for returning payload status result. tx: oneshot::Sender>, }, + /// Message with new payload used by `reth_newPayload` endpoint. + /// + /// Waits for persistence, execution cache, and sparse trie locks before processing, + /// and returns detailed timing breakdown alongside the payload status. + RethNewPayload { + /// The execution payload received by Engine API. + payload: Payload::ExecutionData, + /// The sender for returning payload status result and timing breakdown. + tx: oneshot::Sender>, + }, /// Message with updated forkchoice state. ForkchoiceUpdated { /// The updated forkchoice state. @@ -178,6 +203,15 @@ impl Display for BeaconEngineMessage { payload.block_hash() ) } + Self::RethNewPayload { payload, .. } => { + write!( + f, + "RethNewPayload(parent: {}, number: {}, hash: {})", + payload.parent_hash(), + payload.block_number(), + payload.block_hash() + ) + } Self::ForkchoiceUpdated { state, payload_attrs, .. } => { // we don't want to print the entire payload attributes, because for OP this // includes all txs @@ -223,6 +257,19 @@ where rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)? } + /// Sends a new payload message used by `reth_newPayload` endpoint. + /// + /// Waits for persistence, execution cache, and sparse trie locks before processing, + /// and returns detailed timing breakdown alongside the payload status. + pub async fn reth_new_payload( + &self, + payload: Payload::ExecutionData, + ) -> Result<(PayloadStatus, NewPayloadTimings), BeaconOnNewPayloadError> { + let (tx, rx) = oneshot::channel(); + let _ = self.to_engine.send(BeaconEngineMessage::RethNewPayload { payload, tx }); + rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)? + } + /// Sends a forkchoice update message to the beacon consensus engine and waits for a response. /// /// See also diff --git a/crates/engine/tree/src/launch.rs b/crates/engine/tree/src/launch.rs index 5d1523e017..12120378b9 100644 --- a/crates/engine/tree/src/launch.rs +++ b/crates/engine/tree/src/launch.rs @@ -10,7 +10,7 @@ use crate::{ download::BasicBlockDownloader, engine::{EngineApiKind, EngineApiRequest, EngineApiRequestHandler, EngineHandler}, persistence::PersistenceHandle, - tree::{EngineApiTreeHandler, EngineValidator, TreeConfig}, + tree::{EngineApiTreeHandler, EngineValidator, TreeConfig, WaitForCaches}, }; use futures::Stream; use reth_consensus::FullConsensus; @@ -76,7 +76,7 @@ where N: ProviderNodeTypes, Client: BlockClient::Block> + 'static, S: Stream> + Send + Sync + Unpin + 'static, - V: EngineValidator, + V: EngineValidator + WaitForCaches, C: ConfigureEvm + 'static, { let downloader = BasicBlockDownloader::new(client, consensus.clone()); diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 55aa1e525c..43a247fc25 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -19,7 +19,7 @@ use reth_chain_state::{ use reth_consensus::{Consensus, FullConsensus}; use reth_engine_primitives::{ BeaconEngineMessage, BeaconOnNewPayloadError, ConsensusEngineEvent, ExecutionPayload, - ForkchoiceStateTracker, OnForkChoiceUpdated, + ForkchoiceStateTracker, NewPayloadTimings, OnForkChoiceUpdated, }; use reth_errors::{ConsensusError, ProviderResult}; use reth_evm::ConfigureEvm; @@ -323,7 +323,7 @@ where + StorageSettingsCache, C: ConfigureEvm + 'static, T: PayloadTypes>, - V: EngineValidator, + V: EngineValidator + WaitForCaches, { /// Creates a new [`EngineApiTreeHandler`]. #[expect(clippy::too_many_arguments)] @@ -1555,6 +1555,81 @@ where // handle the event if any self.on_maybe_tree_event(maybe_event)?; } + BeaconEngineMessage::RethNewPayload { payload, tx } => { + let pending_persistence = self.persistence_state.rx.take(); + let validator = &self.payload_validator; + + debug!(target: "engine::tree", "Waiting for persistence and caches in parallel before processing reth_newPayload"); + let (persistence_tx, persistence_rx) = std::sync::mpsc::channel(); + if let Some((rx, start_time, _action)) = pending_persistence { + tokio::task::spawn_blocking(move || { + let start = Instant::now(); + let result = rx.recv().ok(); + let _ = persistence_tx.send(( + result, + start_time, + start.elapsed(), + )); + }); + } + + let cache_wait = validator.wait_for_caches(); + let persistence_result = persistence_rx.try_recv().ok(); + + let persistence_wait = + if let Some((result, start_time, wait_duration)) = + persistence_result + { + let _ = self + .on_persistence_complete(result.flatten(), start_time); + Some(wait_duration) + } else { + None + }; + + debug!( + target: "engine::tree", + persistence_wait = ?persistence_wait, + execution_cache_wait = ?cache_wait.execution_cache, + sparse_trie_wait = ?cache_wait.sparse_trie, + "Peresistence finished and caches updated for reth_newPayload" + ); + + let start = Instant::now(); + let gas_used = payload.gas_used(); + let num_hash = payload.num_hash(); + let mut output = self.on_new_payload(payload); + let latency = start.elapsed(); + self.metrics.engine.new_payload.update_response_metrics( + start, + &mut self.metrics.engine.forkchoice_updated.latest_finish_at, + &output, + gas_used, + ); + + let maybe_event = + output.as_mut().ok().and_then(|out| out.event.take()); + + let timings = NewPayloadTimings { + latency, + persistence_wait, + execution_cache_wait: cache_wait.execution_cache, + sparse_trie_wait: cache_wait.sparse_trie, + }; + if let Err(err) = + tx.send(output.map(|o| (o.outcome, timings)).map_err(|e| { + BeaconOnNewPayloadError::Internal(Box::new(e)) + })) + { + error!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to send event: {err:?}"); + self.metrics + .engine + .failed_new_payload_response_deliveries + .increment(1); + } + + self.on_maybe_tree_event(maybe_event)?; + } } } } diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index dedcd961c5..3a4ea5fb1d 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -50,6 +50,7 @@ use std::{ mpsc::{self, channel}, Arc, }, + time::Duration, }; use tracing::{debug, debug_span, instrument, warn, Span}; @@ -62,6 +63,26 @@ pub mod sparse_trie; use preserved_sparse_trie::{PreservedSparseTrie, SharedPreservedSparseTrie}; +/// Result of waiting for caches to become available. +#[derive(Debug, Clone, Copy, Default)] +pub struct CacheWaitDurations { + /// Time spent waiting for the execution cache lock. + pub execution_cache: Duration, + /// Time spent waiting for the sparse trie lock. + pub sparse_trie: Duration, +} + +/// Trait for types that can wait for execution cache and sparse trie locks to become available. +/// +/// This is used by `reth_newPayload` endpoint to ensure that payload processing +/// waits for any ongoing operations to complete before starting. +pub trait WaitForCaches { + /// Waits for persistence and cache updates to complete. + /// + /// Returns the time spent waiting for each cache separately. + fn wait_for_caches(&self) -> CacheWaitDurations; +} + /// Default parallelism thresholds to use with the [`ParallelSparseTrie`]. /// /// These values were determined by performing benchmarks using gradually increasing values to judge @@ -178,6 +199,46 @@ where } } +impl WaitForCaches for PayloadProcessor +where + Evm: ConfigureEvm, +{ + fn wait_for_caches(&self) -> CacheWaitDurations { + debug!(target: "engine::tree::payload_processor", "Waiting for execution cache and sparse trie locks"); + + // Wait for both caches in parallel using std threads + let execution_cache = self.execution_cache.clone(); + let sparse_trie = self.sparse_state_trie.clone(); + + // Use channels and spawn_blocking instead of std::thread::spawn + let (execution_tx, execution_rx) = std::sync::mpsc::channel(); + let (sparse_trie_tx, sparse_trie_rx) = std::sync::mpsc::channel(); + + self.executor.spawn_blocking(move || { + let _ = execution_tx.send(execution_cache.wait_for_availability()); + }); + self.executor.spawn_blocking(move || { + let _ = sparse_trie_tx.send(sparse_trie.wait_for_availability()); + }); + + let execution_cache_duration = + execution_rx.recv().expect("execution cache wait task failed to send result"); + let sparse_trie_duration = + sparse_trie_rx.recv().expect("sparse trie wait task failed to send result"); + + debug!( + target: "engine::tree::payload_processor", + ?execution_cache_duration, + ?sparse_trie_duration, + "Execution cache and sparse trie locks acquired" + ); + CacheWaitDurations { + execution_cache: execution_cache_duration, + sparse_trie: sparse_trie_duration, + } + } +} + impl PayloadProcessor where N: NodePrimitives, @@ -966,6 +1027,27 @@ impl PayloadExecutionCache { self.inner.write().take(); } + /// Waits until the execution cache becomes available for use. + /// + /// This acquires a write lock to ensure exclusive access, then immediately releases it. + /// This is useful for synchronization before starting payload processing. + /// + /// Returns the time spent waiting for the lock. + pub fn wait_for_availability(&self) -> Duration { + let start = Instant::now(); + // Acquire write lock to wait for any current holders to finish + let _guard = self.inner.write(); + let elapsed = start.elapsed(); + if elapsed.as_millis() > 5 { + debug!( + target: "engine::tree::payload_processor", + blocked_for=?elapsed, + "Waited for execution cache to become available" + ); + } + elapsed + } + /// Updates the cache with a closure that has exclusive access to the guard. /// This ensures that all cache operations happen atomically. /// diff --git a/crates/engine/tree/src/tree/payload_processor/preserved_sparse_trie.rs b/crates/engine/tree/src/tree/payload_processor/preserved_sparse_trie.rs index e3c47b1f50..6a147691af 100644 --- a/crates/engine/tree/src/tree/payload_processor/preserved_sparse_trie.rs +++ b/crates/engine/tree/src/tree/payload_processor/preserved_sparse_trie.rs @@ -3,7 +3,7 @@ use alloy_primitives::B256; use parking_lot::Mutex; use reth_trie_sparse::SparseStateTrie; -use std::sync::Arc; +use std::{sync::Arc, time::Instant}; use tracing::debug; /// Type alias for the sparse trie type used in preservation. @@ -28,6 +28,27 @@ impl SharedPreservedSparseTrie { pub(super) fn lock(&self) -> PreservedTrieGuard<'_> { PreservedTrieGuard(self.0.lock()) } + + /// Waits until the sparse trie lock becomes available. + /// + /// This acquires and immediately releases the lock, ensuring that any + /// ongoing operations complete before returning. Useful for synchronization + /// before starting payload processing. + /// + /// Returns the time spent waiting for the lock. + pub(super) fn wait_for_availability(&self) -> std::time::Duration { + let start = Instant::now(); + let _guard = self.0.lock(); + let elapsed = start.elapsed(); + if elapsed.as_millis() > 5 { + debug!( + target: "engine::tree::payload_processor", + blocked_for=?elapsed, + "Waited for preserved sparse trie to become available" + ); + } + elapsed + } } /// Guard that holds the lock on the preserved trie. diff --git a/crates/engine/tree/src/tree/payload_validator.rs b/crates/engine/tree/src/tree/payload_validator.rs index 83da84b182..9d5d5e6614 100644 --- a/crates/engine/tree/src/tree/payload_validator.rs +++ b/crates/engine/tree/src/tree/payload_validator.rs @@ -4,11 +4,11 @@ use crate::tree::{ cached_state::CachedStateProvider, error::{InsertBlockError, InsertBlockErrorKind, InsertPayloadError}, instrumented_state::InstrumentedStateProvider, - payload_processor::PayloadProcessor, + payload_processor::{CacheWaitDurations, PayloadProcessor}, precompile_cache::{CachedPrecompile, CachedPrecompileMetrics, PrecompileCacheMap}, sparse_trie::StateRootComputeOutcome, EngineApiMetrics, EngineApiTreeState, ExecutionEnv, PayloadHandle, StateProviderBuilder, - StateProviderDatabase, TreeConfig, + StateProviderDatabase, TreeConfig, WaitForCaches, }; use alloy_consensus::transaction::{Either, TxHashRef}; use alloy_eip7928::BlockAccessList; @@ -1585,6 +1585,15 @@ where } } +impl WaitForCaches for BasicEngineValidator +where + Evm: ConfigureEvm, +{ + fn wait_for_caches(&self) -> CacheWaitDurations { + self.payload_processor.wait_for_caches() + } +} + /// Enum representing either block or payload being validated. #[derive(Debug)] pub enum BlockOrPayload { diff --git a/crates/engine/util/src/engine_store.rs b/crates/engine/util/src/engine_store.rs index a79504db30..37d8cff24b 100644 --- a/crates/engine/util/src/engine_store.rs +++ b/crates/engine/util/src/engine_store.rs @@ -77,7 +77,8 @@ impl EngineMessageStore { })?, )?; } - BeaconEngineMessage::NewPayload { payload, tx: _tx } => { + BeaconEngineMessage::NewPayload { payload, .. } | + BeaconEngineMessage::RethNewPayload { payload, .. } => { let filename = format!("{}-new_payload-{}.json", timestamp, payload.block_hash()); fs::write( self.path.join(filename), diff --git a/crates/node/builder/src/rpc.rs b/crates/node/builder/src/rpc.rs index ce84852e7c..9015e567fa 100644 --- a/crates/node/builder/src/rpc.rs +++ b/crates/node/builder/src/rpc.rs @@ -1,6 +1,7 @@ //! Builder support for rpc components. pub use jsonrpsee::server::middleware::rpc::{RpcService, RpcServiceBuilder}; +use reth_engine_tree::tree::WaitForCaches; pub use reth_engine_tree::tree::{BasicEngineValidator, EngineValidator}; pub use reth_rpc_builder::{middleware::RethRpcMiddleware, Identity, Stack}; pub use reth_trie_db::ChangesetCache; @@ -1278,10 +1279,8 @@ pub trait PayloadValidatorBuilder: Send + Sync + Clone /// for block execution, state validation, and fork handling. pub trait EngineValidatorBuilder: Send + Sync + Clone { /// The tree validator type that will be used by the consensus engine. - type EngineValidator: EngineValidator< - ::Payload, - ::Primitives, - >; + type EngineValidator: EngineValidator<::Payload, ::Primitives> + + WaitForCaches; /// Builds the tree validator for the consensus engine. /// diff --git a/crates/node/core/src/args/benchmark_args.rs b/crates/node/core/src/args/benchmark_args.rs index f41926d3af..e6955a8f9f 100644 --- a/crates/node/core/src/args/benchmark_args.rs +++ b/crates/node/core/src/args/benchmark_args.rs @@ -58,6 +58,14 @@ pub struct BenchmarkArgs { /// The path to the output directory for granular benchmark results. #[arg(long, short, value_name = "BENCHMARK_OUTPUT", verbatim_doc_comment)] pub output: Option, + + /// Use `reth_newPayload` endpoint instead of `engine_newPayload*`. + /// + /// The `reth_newPayload` endpoint is a reth-specific extension that takes `ExecutionData` + /// directly, waits for persistence and cache updates to complete before processing, + /// and returns server-side timing breakdowns (latency, persistence wait, cache wait). + #[arg(long, default_value = "false", verbatim_doc_comment)] + pub reth_new_payload: bool, } #[cfg(test)] diff --git a/crates/rpc/rpc-api/Cargo.toml b/crates/rpc/rpc-api/Cargo.toml index 9287c17465..26d236086d 100644 --- a/crates/rpc/rpc-api/Cargo.toml +++ b/crates/rpc/rpc-api/Cargo.toml @@ -39,6 +39,7 @@ alloy-genesis.workspace = true # misc jsonrpsee = { workspace = true, features = ["server", "macros"] } +serde = { workspace = true, features = ["derive"] } serde_json.workspace = true [features] diff --git a/crates/rpc/rpc-api/src/lib.rs b/crates/rpc/rpc-api/src/lib.rs index 9c3a4baa03..964437012b 100644 --- a/crates/rpc/rpc-api/src/lib.rs +++ b/crates/rpc/rpc-api/src/lib.rs @@ -24,6 +24,7 @@ mod miner; mod net; mod otterscan; mod reth; +mod reth_engine; mod rpc; mod testing; mod trace; @@ -47,6 +48,7 @@ pub mod servers { net::NetApiServer, otterscan::OtterscanServer, reth::RethApiServer, + reth_engine::{RethEngineApiServer, RethPayloadStatus}, rpc::RpcApiServer, testing::TestingApiServer, trace::TraceApiServer, @@ -78,6 +80,7 @@ pub mod clients { net::NetApiClient, otterscan::OtterscanClient, reth::RethApiClient, + reth_engine::RethEngineApiClient, rpc::RpcApiServer, testing::TestingApiClient, trace::TraceApiClient, diff --git a/crates/rpc/rpc-api/src/reth_engine.rs b/crates/rpc/rpc-api/src/reth_engine.rs new file mode 100644 index 0000000000..96297c78ef --- /dev/null +++ b/crates/rpc/rpc-api/src/reth_engine.rs @@ -0,0 +1,39 @@ +//! Reth-specific engine API extensions. + +use alloy_rpc_types_engine::{ExecutionData, PayloadStatus}; +use jsonrpsee::{core::RpcResult, proc_macros::rpc}; +use serde::{Deserialize, Serialize}; + +/// Reth-specific payload status that includes server-measured execution latency. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RethPayloadStatus { + /// The standard payload status. + #[serde(flatten)] + pub status: PayloadStatus, + /// Server-side execution latency in microseconds. + pub latency_us: u64, + /// Time spent waiting for persistence to complete, in microseconds. + /// `None` when no persistence was in-flight. + #[serde(skip_serializing_if = "Option::is_none")] + pub persistence_wait_us: Option, + /// Time spent waiting for the execution cache lock, in microseconds. + pub execution_cache_wait_us: u64, + /// Time spent waiting for the sparse trie lock, in microseconds. + pub sparse_trie_wait_us: u64, +} + +/// Reth-specific engine API extensions. +/// +/// This trait provides a `reth_newPayload` endpoint that takes `ExecutionData` directly +/// (payload + sidecar), waiting for persistence and cache locks before processing. +/// +/// Responses include timing breakdowns with server-measured execution latency. +#[cfg_attr(not(feature = "client"), rpc(server, namespace = "reth"))] +#[cfg_attr(feature = "client", rpc(server, client, namespace = "reth"))] +pub trait RethEngineApi { + /// Reth-specific newPayload that takes `ExecutionData` directly. + /// + /// Waits for persistence, execution cache, and sparse trie locks before processing. + #[method(name = "newPayload")] + async fn reth_new_payload(&self, payload: ExecutionData) -> RpcResult; +} diff --git a/crates/rpc/rpc-engine-api/src/engine_api.rs b/crates/rpc/rpc-engine-api/src/engine_api.rs index eea88fba41..84c56dacba 100644 --- a/crates/rpc/rpc-engine-api/src/engine_api.rs +++ b/crates/rpc/rpc-engine-api/src/engine_api.rs @@ -26,7 +26,9 @@ use reth_payload_primitives::{ PayloadOrAttributes, PayloadTypes, }; use reth_primitives_traits::{Block, BlockBody}; -use reth_rpc_api::{EngineApiServer, IntoEngineApiRpcModule}; +use reth_rpc_api::{ + EngineApiServer, IntoEngineApiRpcModule, RethEngineApiServer, RethPayloadStatus, +}; use reth_storage_api::{BlockReader, HeaderProvider, StateProviderFactory}; use reth_tasks::Runtime; use reth_transaction_pool::TransactionPool; @@ -257,6 +259,34 @@ where pub fn accept_execution_requests_hash(&self) -> bool { self.inner.accept_execution_requests_hash } + + /// Waits for persistence, execution cache, and sparse trie locks before processing. + /// + /// Used by `reth_newPayload` endpoint. + pub async fn reth_new_payload( + &self, + payload: PayloadT::ExecutionData, + ) -> EngineApiResult { + let (status, timings) = self.inner.beacon_consensus.reth_new_payload(payload).await?; + Ok(RethPayloadStatus { + status, + latency_us: timings.latency.as_micros() as u64, + persistence_wait_us: timings.persistence_wait.map(|d| d.as_micros() as u64), + execution_cache_wait_us: timings.execution_cache_wait.as_micros() as u64, + sparse_trie_wait_us: timings.sparse_trie_wait.as_micros() as u64, + }) + } + + /// Metered version of `reth_new_payload`. + pub async fn reth_new_payload_metered( + &self, + payload: PayloadT::ExecutionData, + ) -> RpcResult { + let start = Instant::now(); + let res = Self::reth_new_payload(self, payload).await; + self.inner.metrics.latency.new_payload_v1.record(start.elapsed()); + Ok(res?) + } } impl @@ -1283,14 +1313,40 @@ where } } +/// Implementation of `RethEngineApiServer` under the `reth_` namespace. +/// +/// Waits for execution cache and sparse trie locks before processing. +#[async_trait] +impl RethEngineApiServer + for EngineApi +where + Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static, + EngineT: EngineTypes, + Pool: TransactionPool + 'static, + Validator: EngineApiValidator, + ChainSpec: EthereumHardforks + Send + Sync + 'static, +{ + async fn reth_new_payload(&self, payload: ExecutionData) -> RpcResult { + trace!(target: "rpc::engine", "Serving reth_newPayload"); + self.reth_new_payload_metered(payload).await + } +} + impl IntoEngineApiRpcModule for EngineApi where EngineT: EngineTypes, - Self: EngineApiServer, + Self: EngineApiServer + RethEngineApiServer, { fn into_rpc_module(self) -> RpcModule<()> { - self.into_rpc().remove_context() + let mut module = EngineApiServer::::into_rpc(self.clone()).remove_context(); + + // Merge reth_newPayload endpoint + module + .merge(RethEngineApiServer::into_rpc(self).remove_context()) + .expect("No conflicting methods"); + + module } }