feat(rpc, reth-bench): reth_newPayload methods for reth-bench (#22133)

Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
This commit is contained in:
Alexey Shekhirin
2026-02-16 11:11:13 +00:00
committed by GitHub
parent 8db125daff
commit 881500e592
21 changed files with 655 additions and 74 deletions

View File

@@ -15,6 +15,7 @@ use futures::{future::Either, FutureExt, TryFutureExt};
use reth_errors::RethResult;
use reth_payload_builder_primitives::PayloadBuilderError;
use reth_payload_primitives::{EngineApiMessageVersion, PayloadTypes};
use std::time::Duration;
use tokio::sync::{mpsc::UnboundedSender, oneshot};
/// Type alias for backwards compat
@@ -142,6 +143,20 @@ impl Future for PendingPayloadId {
}
}
/// Timing breakdown for `reth_newPayload` responses.
#[derive(Debug, Clone, Copy)]
pub struct NewPayloadTimings {
/// Server-side execution latency.
pub latency: Duration,
/// Time spent waiting for persistence to complete.
/// `None` when no persistence was in-flight.
pub persistence_wait: Option<Duration>,
/// Time spent waiting for the execution cache lock.
pub execution_cache_wait: Duration,
/// Time spent waiting for the sparse trie lock.
pub sparse_trie_wait: Duration,
}
/// A message for the beacon engine from other components of the node (engine RPC API invoked by the
/// consensus layer).
#[derive(Debug)]
@@ -153,6 +168,16 @@ pub enum BeaconEngineMessage<Payload: PayloadTypes> {
/// The sender for returning payload status result.
tx: oneshot::Sender<Result<PayloadStatus, BeaconOnNewPayloadError>>,
},
/// Message with new payload used by `reth_newPayload` endpoint.
///
/// Waits for persistence, execution cache, and sparse trie locks before processing,
/// and returns detailed timing breakdown alongside the payload status.
RethNewPayload {
/// The execution payload received by Engine API.
payload: Payload::ExecutionData,
/// The sender for returning payload status result and timing breakdown.
tx: oneshot::Sender<Result<(PayloadStatus, NewPayloadTimings), BeaconOnNewPayloadError>>,
},
/// Message with updated forkchoice state.
ForkchoiceUpdated {
/// The updated forkchoice state.
@@ -178,6 +203,15 @@ impl<Payload: PayloadTypes> Display for BeaconEngineMessage<Payload> {
payload.block_hash()
)
}
Self::RethNewPayload { payload, .. } => {
write!(
f,
"RethNewPayload(parent: {}, number: {}, hash: {})",
payload.parent_hash(),
payload.block_number(),
payload.block_hash()
)
}
Self::ForkchoiceUpdated { state, payload_attrs, .. } => {
// we don't want to print the entire payload attributes, because for OP this
// includes all txs
@@ -223,6 +257,19 @@ where
rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)?
}
/// Sends a new payload message used by `reth_newPayload` endpoint.
///
/// Waits for persistence, execution cache, and sparse trie locks before processing,
/// and returns detailed timing breakdown alongside the payload status.
pub async fn reth_new_payload(
&self,
payload: Payload::ExecutionData,
) -> Result<(PayloadStatus, NewPayloadTimings), BeaconOnNewPayloadError> {
let (tx, rx) = oneshot::channel();
let _ = self.to_engine.send(BeaconEngineMessage::RethNewPayload { payload, tx });
rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)?
}
/// Sends a forkchoice update message to the beacon consensus engine and waits for a response.
///
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_forkchoiceupdatedv2>

View File

@@ -10,7 +10,7 @@ use crate::{
download::BasicBlockDownloader,
engine::{EngineApiKind, EngineApiRequest, EngineApiRequestHandler, EngineHandler},
persistence::PersistenceHandle,
tree::{EngineApiTreeHandler, EngineValidator, TreeConfig},
tree::{EngineApiTreeHandler, EngineValidator, TreeConfig, WaitForCaches},
};
use futures::Stream;
use reth_consensus::FullConsensus;
@@ -76,7 +76,7 @@ where
N: ProviderNodeTypes,
Client: BlockClient<Block = <N::Primitives as NodePrimitives>::Block> + 'static,
S: Stream<Item = BeaconEngineMessage<N::Payload>> + Send + Sync + Unpin + 'static,
V: EngineValidator<N::Payload>,
V: EngineValidator<N::Payload> + WaitForCaches,
C: ConfigureEvm<Primitives = N::Primitives> + 'static,
{
let downloader = BasicBlockDownloader::new(client, consensus.clone());

View File

@@ -19,7 +19,7 @@ use reth_chain_state::{
use reth_consensus::{Consensus, FullConsensus};
use reth_engine_primitives::{
BeaconEngineMessage, BeaconOnNewPayloadError, ConsensusEngineEvent, ExecutionPayload,
ForkchoiceStateTracker, OnForkChoiceUpdated,
ForkchoiceStateTracker, NewPayloadTimings, OnForkChoiceUpdated,
};
use reth_errors::{ConsensusError, ProviderResult};
use reth_evm::ConfigureEvm;
@@ -323,7 +323,7 @@ where
+ StorageSettingsCache,
C: ConfigureEvm<Primitives = N> + 'static,
T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
V: EngineValidator<T>,
V: EngineValidator<T> + WaitForCaches,
{
/// Creates a new [`EngineApiTreeHandler`].
#[expect(clippy::too_many_arguments)]
@@ -1555,6 +1555,81 @@ where
// handle the event if any
self.on_maybe_tree_event(maybe_event)?;
}
BeaconEngineMessage::RethNewPayload { payload, tx } => {
let pending_persistence = self.persistence_state.rx.take();
let validator = &self.payload_validator;
debug!(target: "engine::tree", "Waiting for persistence and caches in parallel before processing reth_newPayload");
let (persistence_tx, persistence_rx) = std::sync::mpsc::channel();
if let Some((rx, start_time, _action)) = pending_persistence {
tokio::task::spawn_blocking(move || {
let start = Instant::now();
let result = rx.recv().ok();
let _ = persistence_tx.send((
result,
start_time,
start.elapsed(),
));
});
}
let cache_wait = validator.wait_for_caches();
let persistence_result = persistence_rx.try_recv().ok();
let persistence_wait =
if let Some((result, start_time, wait_duration)) =
persistence_result
{
let _ = self
.on_persistence_complete(result.flatten(), start_time);
Some(wait_duration)
} else {
None
};
debug!(
target: "engine::tree",
persistence_wait = ?persistence_wait,
execution_cache_wait = ?cache_wait.execution_cache,
sparse_trie_wait = ?cache_wait.sparse_trie,
"Peresistence finished and caches updated for reth_newPayload"
);
let start = Instant::now();
let gas_used = payload.gas_used();
let num_hash = payload.num_hash();
let mut output = self.on_new_payload(payload);
let latency = start.elapsed();
self.metrics.engine.new_payload.update_response_metrics(
start,
&mut self.metrics.engine.forkchoice_updated.latest_finish_at,
&output,
gas_used,
);
let maybe_event =
output.as_mut().ok().and_then(|out| out.event.take());
let timings = NewPayloadTimings {
latency,
persistence_wait,
execution_cache_wait: cache_wait.execution_cache,
sparse_trie_wait: cache_wait.sparse_trie,
};
if let Err(err) =
tx.send(output.map(|o| (o.outcome, timings)).map_err(|e| {
BeaconOnNewPayloadError::Internal(Box::new(e))
}))
{
error!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
self.metrics
.engine
.failed_new_payload_response_deliveries
.increment(1);
}
self.on_maybe_tree_event(maybe_event)?;
}
}
}
}

View File

@@ -50,6 +50,7 @@ use std::{
mpsc::{self, channel},
Arc,
},
time::Duration,
};
use tracing::{debug, debug_span, instrument, warn, Span};
@@ -62,6 +63,26 @@ pub mod sparse_trie;
use preserved_sparse_trie::{PreservedSparseTrie, SharedPreservedSparseTrie};
/// Result of waiting for caches to become available.
#[derive(Debug, Clone, Copy, Default)]
pub struct CacheWaitDurations {
/// Time spent waiting for the execution cache lock.
pub execution_cache: Duration,
/// Time spent waiting for the sparse trie lock.
pub sparse_trie: Duration,
}
/// Trait for types that can wait for execution cache and sparse trie locks to become available.
///
/// This is used by `reth_newPayload` endpoint to ensure that payload processing
/// waits for any ongoing operations to complete before starting.
pub trait WaitForCaches {
/// Waits for persistence and cache updates to complete.
///
/// Returns the time spent waiting for each cache separately.
fn wait_for_caches(&self) -> CacheWaitDurations;
}
/// Default parallelism thresholds to use with the [`ParallelSparseTrie`].
///
/// These values were determined by performing benchmarks using gradually increasing values to judge
@@ -178,6 +199,46 @@ where
}
}
impl<Evm> WaitForCaches for PayloadProcessor<Evm>
where
Evm: ConfigureEvm,
{
fn wait_for_caches(&self) -> CacheWaitDurations {
debug!(target: "engine::tree::payload_processor", "Waiting for execution cache and sparse trie locks");
// Wait for both caches in parallel using std threads
let execution_cache = self.execution_cache.clone();
let sparse_trie = self.sparse_state_trie.clone();
// Use channels and spawn_blocking instead of std::thread::spawn
let (execution_tx, execution_rx) = std::sync::mpsc::channel();
let (sparse_trie_tx, sparse_trie_rx) = std::sync::mpsc::channel();
self.executor.spawn_blocking(move || {
let _ = execution_tx.send(execution_cache.wait_for_availability());
});
self.executor.spawn_blocking(move || {
let _ = sparse_trie_tx.send(sparse_trie.wait_for_availability());
});
let execution_cache_duration =
execution_rx.recv().expect("execution cache wait task failed to send result");
let sparse_trie_duration =
sparse_trie_rx.recv().expect("sparse trie wait task failed to send result");
debug!(
target: "engine::tree::payload_processor",
?execution_cache_duration,
?sparse_trie_duration,
"Execution cache and sparse trie locks acquired"
);
CacheWaitDurations {
execution_cache: execution_cache_duration,
sparse_trie: sparse_trie_duration,
}
}
}
impl<N, Evm> PayloadProcessor<Evm>
where
N: NodePrimitives,
@@ -966,6 +1027,27 @@ impl PayloadExecutionCache {
self.inner.write().take();
}
/// Waits until the execution cache becomes available for use.
///
/// This acquires a write lock to ensure exclusive access, then immediately releases it.
/// This is useful for synchronization before starting payload processing.
///
/// Returns the time spent waiting for the lock.
pub fn wait_for_availability(&self) -> Duration {
let start = Instant::now();
// Acquire write lock to wait for any current holders to finish
let _guard = self.inner.write();
let elapsed = start.elapsed();
if elapsed.as_millis() > 5 {
debug!(
target: "engine::tree::payload_processor",
blocked_for=?elapsed,
"Waited for execution cache to become available"
);
}
elapsed
}
/// Updates the cache with a closure that has exclusive access to the guard.
/// This ensures that all cache operations happen atomically.
///

View File

@@ -3,7 +3,7 @@
use alloy_primitives::B256;
use parking_lot::Mutex;
use reth_trie_sparse::SparseStateTrie;
use std::sync::Arc;
use std::{sync::Arc, time::Instant};
use tracing::debug;
/// Type alias for the sparse trie type used in preservation.
@@ -28,6 +28,27 @@ impl SharedPreservedSparseTrie {
pub(super) fn lock(&self) -> PreservedTrieGuard<'_> {
PreservedTrieGuard(self.0.lock())
}
/// Waits until the sparse trie lock becomes available.
///
/// This acquires and immediately releases the lock, ensuring that any
/// ongoing operations complete before returning. Useful for synchronization
/// before starting payload processing.
///
/// Returns the time spent waiting for the lock.
pub(super) fn wait_for_availability(&self) -> std::time::Duration {
let start = Instant::now();
let _guard = self.0.lock();
let elapsed = start.elapsed();
if elapsed.as_millis() > 5 {
debug!(
target: "engine::tree::payload_processor",
blocked_for=?elapsed,
"Waited for preserved sparse trie to become available"
);
}
elapsed
}
}
/// Guard that holds the lock on the preserved trie.

View File

@@ -4,11 +4,11 @@ use crate::tree::{
cached_state::CachedStateProvider,
error::{InsertBlockError, InsertBlockErrorKind, InsertPayloadError},
instrumented_state::InstrumentedStateProvider,
payload_processor::PayloadProcessor,
payload_processor::{CacheWaitDurations, PayloadProcessor},
precompile_cache::{CachedPrecompile, CachedPrecompileMetrics, PrecompileCacheMap},
sparse_trie::StateRootComputeOutcome,
EngineApiMetrics, EngineApiTreeState, ExecutionEnv, PayloadHandle, StateProviderBuilder,
StateProviderDatabase, TreeConfig,
StateProviderDatabase, TreeConfig, WaitForCaches,
};
use alloy_consensus::transaction::{Either, TxHashRef};
use alloy_eip7928::BlockAccessList;
@@ -1585,6 +1585,15 @@ where
}
}
impl<P, Evm, V> WaitForCaches for BasicEngineValidator<P, Evm, V>
where
Evm: ConfigureEvm,
{
fn wait_for_caches(&self) -> CacheWaitDurations {
self.payload_processor.wait_for_caches()
}
}
/// Enum representing either block or payload being validated.
#[derive(Debug)]
pub enum BlockOrPayload<T: PayloadTypes> {

View File

@@ -77,7 +77,8 @@ impl EngineMessageStore {
})?,
)?;
}
BeaconEngineMessage::NewPayload { payload, tx: _tx } => {
BeaconEngineMessage::NewPayload { payload, .. } |
BeaconEngineMessage::RethNewPayload { payload, .. } => {
let filename = format!("{}-new_payload-{}.json", timestamp, payload.block_hash());
fs::write(
self.path.join(filename),

View File

@@ -1,6 +1,7 @@
//! Builder support for rpc components.
pub use jsonrpsee::server::middleware::rpc::{RpcService, RpcServiceBuilder};
use reth_engine_tree::tree::WaitForCaches;
pub use reth_engine_tree::tree::{BasicEngineValidator, EngineValidator};
pub use reth_rpc_builder::{middleware::RethRpcMiddleware, Identity, Stack};
pub use reth_trie_db::ChangesetCache;
@@ -1278,10 +1279,8 @@ pub trait PayloadValidatorBuilder<Node: FullNodeComponents>: Send + Sync + Clone
/// for block execution, state validation, and fork handling.
pub trait EngineValidatorBuilder<Node: FullNodeComponents>: Send + Sync + Clone {
/// The tree validator type that will be used by the consensus engine.
type EngineValidator: EngineValidator<
<Node::Types as NodeTypes>::Payload,
<Node::Types as NodeTypes>::Primitives,
>;
type EngineValidator: EngineValidator<<Node::Types as NodeTypes>::Payload, <Node::Types as NodeTypes>::Primitives>
+ WaitForCaches;
/// Builds the tree validator for the consensus engine.
///

View File

@@ -58,6 +58,14 @@ pub struct BenchmarkArgs {
/// The path to the output directory for granular benchmark results.
#[arg(long, short, value_name = "BENCHMARK_OUTPUT", verbatim_doc_comment)]
pub output: Option<PathBuf>,
/// Use `reth_newPayload` endpoint instead of `engine_newPayload*`.
///
/// The `reth_newPayload` endpoint is a reth-specific extension that takes `ExecutionData`
/// directly, waits for persistence and cache updates to complete before processing,
/// and returns server-side timing breakdowns (latency, persistence wait, cache wait).
#[arg(long, default_value = "false", verbatim_doc_comment)]
pub reth_new_payload: bool,
}
#[cfg(test)]

View File

@@ -39,6 +39,7 @@ alloy-genesis.workspace = true
# misc
jsonrpsee = { workspace = true, features = ["server", "macros"] }
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
[features]

View File

@@ -24,6 +24,7 @@ mod miner;
mod net;
mod otterscan;
mod reth;
mod reth_engine;
mod rpc;
mod testing;
mod trace;
@@ -47,6 +48,7 @@ pub mod servers {
net::NetApiServer,
otterscan::OtterscanServer,
reth::RethApiServer,
reth_engine::{RethEngineApiServer, RethPayloadStatus},
rpc::RpcApiServer,
testing::TestingApiServer,
trace::TraceApiServer,
@@ -78,6 +80,7 @@ pub mod clients {
net::NetApiClient,
otterscan::OtterscanClient,
reth::RethApiClient,
reth_engine::RethEngineApiClient,
rpc::RpcApiServer,
testing::TestingApiClient,
trace::TraceApiClient,

View File

@@ -0,0 +1,39 @@
//! Reth-specific engine API extensions.
use alloy_rpc_types_engine::{ExecutionData, PayloadStatus};
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
use serde::{Deserialize, Serialize};
/// Reth-specific payload status that includes server-measured execution latency.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RethPayloadStatus {
/// The standard payload status.
#[serde(flatten)]
pub status: PayloadStatus,
/// Server-side execution latency in microseconds.
pub latency_us: u64,
/// Time spent waiting for persistence to complete, in microseconds.
/// `None` when no persistence was in-flight.
#[serde(skip_serializing_if = "Option::is_none")]
pub persistence_wait_us: Option<u64>,
/// Time spent waiting for the execution cache lock, in microseconds.
pub execution_cache_wait_us: u64,
/// Time spent waiting for the sparse trie lock, in microseconds.
pub sparse_trie_wait_us: u64,
}
/// Reth-specific engine API extensions.
///
/// This trait provides a `reth_newPayload` endpoint that takes `ExecutionData` directly
/// (payload + sidecar), waiting for persistence and cache locks before processing.
///
/// Responses include timing breakdowns with server-measured execution latency.
#[cfg_attr(not(feature = "client"), rpc(server, namespace = "reth"))]
#[cfg_attr(feature = "client", rpc(server, client, namespace = "reth"))]
pub trait RethEngineApi {
/// Reth-specific newPayload that takes `ExecutionData` directly.
///
/// Waits for persistence, execution cache, and sparse trie locks before processing.
#[method(name = "newPayload")]
async fn reth_new_payload(&self, payload: ExecutionData) -> RpcResult<RethPayloadStatus>;
}

View File

@@ -26,7 +26,9 @@ use reth_payload_primitives::{
PayloadOrAttributes, PayloadTypes,
};
use reth_primitives_traits::{Block, BlockBody};
use reth_rpc_api::{EngineApiServer, IntoEngineApiRpcModule};
use reth_rpc_api::{
EngineApiServer, IntoEngineApiRpcModule, RethEngineApiServer, RethPayloadStatus,
};
use reth_storage_api::{BlockReader, HeaderProvider, StateProviderFactory};
use reth_tasks::Runtime;
use reth_transaction_pool::TransactionPool;
@@ -257,6 +259,34 @@ where
pub fn accept_execution_requests_hash(&self) -> bool {
self.inner.accept_execution_requests_hash
}
/// Waits for persistence, execution cache, and sparse trie locks before processing.
///
/// Used by `reth_newPayload` endpoint.
pub async fn reth_new_payload(
&self,
payload: PayloadT::ExecutionData,
) -> EngineApiResult<RethPayloadStatus> {
let (status, timings) = self.inner.beacon_consensus.reth_new_payload(payload).await?;
Ok(RethPayloadStatus {
status,
latency_us: timings.latency.as_micros() as u64,
persistence_wait_us: timings.persistence_wait.map(|d| d.as_micros() as u64),
execution_cache_wait_us: timings.execution_cache_wait.as_micros() as u64,
sparse_trie_wait_us: timings.sparse_trie_wait.as_micros() as u64,
})
}
/// Metered version of `reth_new_payload`.
pub async fn reth_new_payload_metered(
&self,
payload: PayloadT::ExecutionData,
) -> RpcResult<RethPayloadStatus> {
let start = Instant::now();
let res = Self::reth_new_payload(self, payload).await;
self.inner.metrics.latency.new_payload_v1.record(start.elapsed());
Ok(res?)
}
}
impl<Provider, EngineT, Pool, Validator, ChainSpec>
@@ -1283,14 +1313,40 @@ where
}
}
/// Implementation of `RethEngineApiServer` under the `reth_` namespace.
///
/// Waits for execution cache and sparse trie locks before processing.
#[async_trait]
impl<Provider, EngineT, Pool, Validator, ChainSpec> RethEngineApiServer
for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
where
Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
EngineT: EngineTypes<ExecutionData = ExecutionData>,
Pool: TransactionPool + 'static,
Validator: EngineApiValidator<EngineT>,
ChainSpec: EthereumHardforks + Send + Sync + 'static,
{
async fn reth_new_payload(&self, payload: ExecutionData) -> RpcResult<RethPayloadStatus> {
trace!(target: "rpc::engine", "Serving reth_newPayload");
self.reth_new_payload_metered(payload).await
}
}
impl<Provider, EngineT, Pool, Validator, ChainSpec> IntoEngineApiRpcModule
for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
where
EngineT: EngineTypes,
Self: EngineApiServer<EngineT>,
Self: EngineApiServer<EngineT> + RethEngineApiServer,
{
fn into_rpc_module(self) -> RpcModule<()> {
self.into_rpc().remove_context()
let mut module = EngineApiServer::<EngineT>::into_rpc(self.clone()).remove_context();
// Merge reth_newPayload endpoint
module
.merge(RethEngineApiServer::into_rpc(self).remove_context())
.expect("No conflicting methods");
module
}
}