refactor(rpc): extract RethEngineApi into standalone struct (#22504)

Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Alexey Shekhirin <github@shekhirin.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Georgios Konstantopoulos
2026-02-24 02:57:13 -08:00
committed by GitHub
parent ef292ffa00
commit d3bb2faf28
8 changed files with 87 additions and 66 deletions

1
Cargo.lock generated
View File

@@ -9890,6 +9890,7 @@ dependencies = [
"reth-node-core",
"reth-node-ethereum",
"reth-payload-builder",
"reth-payload-primitives",
"reth-primitives-traits",
"reth-provider",
"reth-rpc",

View File

@@ -1020,7 +1020,13 @@ where
.with_executor(node.task_executor().clone())
.with_evm_config(node.evm_config().clone())
.with_consensus(node.consensus().clone())
.build_with_auth_server(module_config, engine_api, eth_api, engine_events.clone());
.build_with_auth_server(
module_config,
engine_api,
eth_api,
engine_events.clone(),
beacon_engine_handle.clone(),
);
// in dev mode we generate 20 random dev-signer accounts
if config.dev.dev {

View File

@@ -1,6 +1,6 @@
//! Reth-specific engine API extensions.
use alloy_rpc_types_engine::{ExecutionData, PayloadStatus};
use alloy_rpc_types_engine::PayloadStatus;
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
use serde::{Deserialize, Serialize};
@@ -30,7 +30,7 @@ pub struct RethPayloadStatus {
/// Responses include timing breakdowns with server-measured execution latency.
#[cfg_attr(not(feature = "client"), rpc(server, namespace = "reth"))]
#[cfg_attr(feature = "client", rpc(server, client, namespace = "reth"))]
pub trait RethEngineApi {
pub trait RethEngineApi<ExecutionData> {
/// Reth-specific newPayload that takes `ExecutionData` directly.
///
/// Waits for persistence, execution cache, and sparse trie locks before processing.

View File

@@ -18,6 +18,8 @@ reth-ipc.workspace = true
reth-chainspec.workspace = true
reth-consensus.workspace = true
reth-engine-primitives.workspace = true
reth-rpc-engine-api.workspace = true
reth-payload-primitives.workspace = true
reth-network-api.workspace = true
reth-node-core.workspace = true
reth-rpc.workspace = true

View File

@@ -32,15 +32,17 @@ use jsonrpsee::{
};
use reth_chainspec::{ChainSpecProvider, EthereumHardforks};
use reth_consensus::FullConsensus;
use reth_engine_primitives::ConsensusEngineEvent;
use reth_engine_primitives::{ConsensusEngineEvent, ConsensusEngineHandle};
use reth_evm::ConfigureEvm;
use reth_network_api::{noop::NoopNetwork, NetworkInfo, Peers};
use reth_payload_primitives::PayloadTypes;
use reth_primitives_traits::{NodePrimitives, TxTy};
use reth_rpc::{
AdminApi, DebugApi, EngineEthApi, EthApi, EthApiBuilder, EthBundle, MinerApi, NetApi,
OtterscanApi, RPCApi, RethApi, TraceApi, TxPoolApi, Web3Api,
};
use reth_rpc_api::servers::*;
use reth_rpc_engine_api::RethEngineApi;
use reth_rpc_eth_api::{
helpers::{
pending_block::PendingEnvBuilder, Call, EthApiSpec, EthTransactions, LoadPendingBlock,
@@ -327,12 +329,13 @@ where
/// This behaves exactly as [`RpcModuleBuilder::build`] for the [`TransportRpcModules`], but
/// also configures the auth (engine api) server, which exposes a subset of the `eth_`
/// namespace.
pub fn build_with_auth_server<EthApi>(
pub fn build_with_auth_server<EthApi, Payload>(
self,
module_config: TransportRpcModuleConfig,
engine: impl IntoEngineApiRpcModule,
eth: EthApi,
engine_events: EventSender<ConsensusEngineEvent<N>>,
beacon_engine_handle: ConsensusEngineHandle<Payload>,
) -> (
TransportRpcModules,
AuthRpcModule,
@@ -340,12 +343,13 @@ where
)
where
EthApi: FullEthApiServer<Provider = Provider, Pool = Pool>,
Payload: PayloadTypes,
{
let config = module_config.config.clone().unwrap_or_default();
let mut registry = self.into_registry(config, eth, engine_events);
let modules = registry.create_transport_rpc_modules(module_config);
let auth_module = registry.create_auth_module(engine);
let auth_module = registry.create_auth_module(engine, beacon_engine_handle);
(modules, auth_module, registry)
}
@@ -875,12 +879,26 @@ where
{
/// Configures the auth module that includes the
/// * `engine_` namespace
/// * `reth_` namespace
/// * `api_` namespace
///
/// Note: This does _not_ register the `engine_` in this registry.
pub fn create_auth_module(&self, engine_api: impl IntoEngineApiRpcModule) -> AuthRpcModule {
pub fn create_auth_module<Payload>(
&self,
engine_api: impl IntoEngineApiRpcModule,
beacon_engine_handle: ConsensusEngineHandle<Payload>,
) -> AuthRpcModule
where
Payload: PayloadTypes,
{
let mut module = engine_api.into_rpc_module();
// Merge reth_* endpoints
let reth_engine_api = RethEngineApi::new(beacon_engine_handle);
module
.merge(RethEngineApiServer::into_rpc(reth_engine_api).remove_context())
.expect("No conflicting methods");
// also merge a subset of `eth_` handlers
let eth_handlers = self.eth_handlers();
let engine_eth = EngineEthApi::new(eth_handlers.api.clone(), eth_handlers.filter.clone());

View File

@@ -26,9 +26,7 @@ use reth_payload_primitives::{
PayloadOrAttributes, PayloadTypes,
};
use reth_primitives_traits::{Block, BlockBody};
use reth_rpc_api::{
EngineApiServer, IntoEngineApiRpcModule, RethEngineApiServer, RethPayloadStatus,
};
use reth_rpc_api::{EngineApiServer, IntoEngineApiRpcModule};
use reth_storage_api::{BlockReader, HeaderProvider, StateProviderFactory};
use reth_tasks::Runtime;
use reth_transaction_pool::TransactionPool;
@@ -259,34 +257,6 @@ where
pub fn accept_execution_requests_hash(&self) -> bool {
self.inner.accept_execution_requests_hash
}
/// Waits for persistence, execution cache, and sparse trie locks before processing.
///
/// Used by `reth_newPayload` endpoint.
pub async fn reth_new_payload(
&self,
payload: PayloadT::ExecutionData,
) -> EngineApiResult<RethPayloadStatus> {
let (status, timings) = self.inner.beacon_consensus.reth_new_payload(payload).await?;
Ok(RethPayloadStatus {
status,
latency_us: timings.latency.as_micros() as u64,
persistence_wait_us: timings.persistence_wait.map(|d| d.as_micros() as u64),
execution_cache_wait_us: timings.execution_cache_wait.as_micros() as u64,
sparse_trie_wait_us: timings.sparse_trie_wait.as_micros() as u64,
})
}
/// Metered version of `reth_new_payload`.
pub async fn reth_new_payload_metered(
&self,
payload: PayloadT::ExecutionData,
) -> RpcResult<RethPayloadStatus> {
let start = Instant::now();
let res = Self::reth_new_payload(self, payload).await;
self.inner.metrics.latency.new_payload_v1.record(start.elapsed());
Ok(res?)
}
}
impl<Provider, EngineT, Pool, Validator, ChainSpec>
@@ -1313,40 +1283,14 @@ where
}
}
/// Implementation of `RethEngineApiServer` under the `reth_` namespace.
///
/// Waits for execution cache and sparse trie locks before processing.
#[async_trait]
impl<Provider, EngineT, Pool, Validator, ChainSpec> RethEngineApiServer
for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
where
Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
EngineT: EngineTypes<ExecutionData = ExecutionData>,
Pool: TransactionPool + 'static,
Validator: EngineApiValidator<EngineT>,
ChainSpec: EthereumHardforks + Send + Sync + 'static,
{
async fn reth_new_payload(&self, payload: ExecutionData) -> RpcResult<RethPayloadStatus> {
trace!(target: "rpc::engine", "Serving reth_newPayload");
self.reth_new_payload_metered(payload).await
}
}
impl<Provider, EngineT, Pool, Validator, ChainSpec> IntoEngineApiRpcModule
for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
where
EngineT: EngineTypes,
Self: EngineApiServer<EngineT> + RethEngineApiServer,
Self: EngineApiServer<EngineT>,
{
fn into_rpc_module(self) -> RpcModule<()> {
let mut module = EngineApiServer::<EngineT>::into_rpc(self.clone()).remove_context();
// Merge reth_newPayload endpoint
module
.merge(RethEngineApiServer::into_rpc(self).remove_context())
.expect("No conflicting methods");
module
EngineApiServer::<EngineT>::into_rpc(self).remove_context()
}
}

View File

@@ -12,6 +12,9 @@
/// The Engine API implementation.
mod engine_api;
/// Reth-specific engine API extensions.
mod reth_engine_api;
/// Engine API capabilities.
pub mod capabilities;
pub use capabilities::EngineCapabilities;
@@ -24,6 +27,7 @@ mod metrics;
pub use engine_api::{EngineApi, EngineApiSender};
pub use error::*;
pub use reth_engine_api::RethEngineApi;
// re-export server trait for convenience
pub use reth_rpc_api::EngineApiServer;

View File

@@ -0,0 +1,46 @@
use crate::EngineApiError;
use async_trait::async_trait;
use jsonrpsee_core::RpcResult;
use reth_engine_primitives::ConsensusEngineHandle;
use reth_payload_primitives::PayloadTypes;
use reth_rpc_api::{RethEngineApiServer, RethPayloadStatus};
use tracing::trace;
/// Standalone implementation of the `reth_` engine API namespace.
///
/// Provides the `reth_newPayload` endpoint that takes `ExecutionData` directly,
/// waits for persistence, execution cache, and sparse trie locks before processing,
/// and returns timing breakdowns with server-measured execution latency.
#[derive(Debug)]
pub struct RethEngineApi<Payload: PayloadTypes> {
beacon_engine_handle: ConsensusEngineHandle<Payload>,
}
impl<Payload: PayloadTypes> RethEngineApi<Payload> {
/// Creates a new [`RethEngineApi`].
pub const fn new(beacon_engine_handle: ConsensusEngineHandle<Payload>) -> Self {
Self { beacon_engine_handle }
}
}
#[async_trait]
impl<Payload: PayloadTypes> RethEngineApiServer<Payload::ExecutionData> for RethEngineApi<Payload> {
async fn reth_new_payload(
&self,
payload: Payload::ExecutionData,
) -> RpcResult<RethPayloadStatus> {
trace!(target: "rpc::engine", "Serving reth_newPayload");
let (status, timings) = self
.beacon_engine_handle
.reth_new_payload(payload)
.await
.map_err(EngineApiError::from)?;
Ok(RethPayloadStatus {
status,
latency_us: timings.latency.as_micros() as u64,
persistence_wait_us: timings.persistence_wait.map(|d| d.as_micros() as u64),
execution_cache_wait_us: timings.execution_cache_wait.as_micros() as u64,
sparse_trie_wait_us: timings.sparse_trie_wait.as_micros() as u64,
})
}
}