feat: make RpcAddOns::launch_add_ons_with composable (#16646)

This commit is contained in:
Federico Gimenez
2025-06-04 19:22:58 +02:00
committed by GitHub
parent a5c09cf4af
commit 19caec3dd9
3 changed files with 239 additions and 57 deletions

View File

@@ -3,7 +3,6 @@
use crate::{BeaconConsensusEngineEvent, BeaconConsensusEngineHandle};
use alloy_rpc_types::engine::ClientVersionV1;
use alloy_rpc_types_engine::ExecutionData;
use futures::TryFutureExt;
use jsonrpsee::RpcModule;
use reth_chain_state::CanonStateSubscriptions;
use reth_chainspec::{ChainSpecProvider, EthereumHardforks};
@@ -21,7 +20,7 @@ use reth_rpc_api::{eth::helpers::AddDevSigners, IntoEngineApiRpcModule};
use reth_rpc_builder::{
auth::{AuthRpcModule, AuthServerHandle},
config::RethRpcServerConfig,
RpcModuleBuilder, RpcRegistryInner, RpcServerHandle, TransportRpcModules,
RpcModuleBuilder, RpcRegistryInner, RpcServerConfig, RpcServerHandle, TransportRpcModules,
};
use reth_rpc_engine_api::{capabilities::EngineCapabilities, EngineApi};
use reth_rpc_eth_types::{cache::cache_new_blocks_task, EthConfig, EthStateCache};
@@ -228,6 +227,17 @@ where
}
}
/// Helper container for the parameters commonly passed to RPC module extension functions.
#[expect(missing_debug_implementations)]
pub struct RpcModuleContainer<'a, Node: FullNodeComponents, EthApi: EthApiTypes> {
/// Holds installed modules per transport type.
pub modules: &'a mut TransportRpcModules,
/// Holds jwt authenticated rpc module.
pub auth_module: &'a mut AuthRpcModule,
/// A Helper type the holds instances of the configured modules.
pub registry: &'a mut RpcRegistry<Node, EthApi>,
}
/// Helper container to encapsulate [`RpcRegistryInner`], [`TransportRpcModules`] and
/// [`AuthRpcModule`].
///
@@ -343,6 +353,55 @@ where
}
}
/// Handle returned when only the regular RPC server (HTTP/WS/IPC) is launched.
///
/// This handle provides access to the RPC server endpoints and registry, but does not
/// include an authenticated Engine API server. Use this when you only need regular
/// RPC functionality.
#[derive(Debug, Clone)]
pub struct RpcServerOnlyHandle<Node: FullNodeComponents, EthApi: EthApiTypes> {
/// Handle to the RPC server
pub rpc_server_handle: RpcServerHandle,
/// Configured RPC modules.
pub rpc_registry: RpcRegistry<Node, EthApi>,
/// Notification channel for engine API events
pub engine_events:
EventSender<BeaconConsensusEngineEvent<<Node::Types as NodeTypes>::Primitives>>,
/// Handle to the consensus engine.
pub engine_handle: BeaconConsensusEngineHandle<<Node::Types as NodeTypes>::Payload>,
}
/// Handle returned when only the authenticated Engine API server is launched.
///
/// This handle provides access to the Engine API server and registry, but does not
/// include the regular RPC servers (HTTP/WS/IPC). Use this for specialized setups
/// that only need Engine API functionality.
#[derive(Debug, Clone)]
pub struct AuthServerOnlyHandle<Node: FullNodeComponents, EthApi: EthApiTypes> {
/// Handle to the auth server (engine API)
pub auth_server_handle: AuthServerHandle,
/// Configured RPC modules.
pub rpc_registry: RpcRegistry<Node, EthApi>,
/// Notification channel for engine API events
pub engine_events:
EventSender<BeaconConsensusEngineEvent<<Node::Types as NodeTypes>::Primitives>>,
/// Handle to the consensus engine.
pub engine_handle: BeaconConsensusEngineHandle<<Node::Types as NodeTypes>::Payload>,
}
/// Internal context struct for RPC setup shared between different launch methods
struct RpcSetupContext<'a, Node: FullNodeComponents, EthApi: EthApiTypes> {
node: Node,
config: &'a NodeConfig<<Node::Types as NodeTypes>::ChainSpec>,
modules: TransportRpcModules,
auth_module: AuthRpcModule,
auth_config: reth_rpc_builder::auth::AuthServerConfig,
registry: RpcRegistry<Node, EthApi>,
on_rpc_started: Box<dyn OnRpcStarted<Node, EthApi>>,
engine_events: EventSender<BeaconConsensusEngineEvent<<Node::Types as NodeTypes>::Primitives>>,
engine_handle: BeaconConsensusEngineHandle<<Node::Types as NodeTypes>::Payload>,
}
/// Node add-ons containing RPC server configuration, with customizable eth API handler.
///
/// This struct can be used to provide the RPC server functionality. It is responsible for launching
@@ -461,6 +520,55 @@ where
EV: EngineValidatorBuilder<N>,
EB: EngineApiBuilder<N>,
{
/// Launches only the regular RPC server (HTTP/WS/IPC), without the authenticated Engine API
/// server.
///
/// This is useful when you only need the regular RPC functionality and want to avoid
/// starting the auth server.
pub async fn launch_rpc_server<F>(
self,
ctx: AddOnsContext<'_, N>,
ext: F,
) -> eyre::Result<RpcServerOnlyHandle<N, EthB::EthApi>>
where
F: FnOnce(RpcModuleContainer<'_, N, EthB::EthApi>) -> eyre::Result<()>,
{
let setup_ctx = self.setup_rpc_components(ctx, ext).await?;
let RpcSetupContext {
node,
config,
mut modules,
mut auth_module,
auth_config: _,
mut registry,
on_rpc_started,
engine_events,
engine_handle,
} = setup_ctx;
let server_config = config.rpc.rpc_server_config();
let rpc_server_handle = Self::launch_rpc_server_internal(server_config, &modules).await?;
let handles =
RethRpcServerHandles { rpc: rpc_server_handle.clone(), auth: AuthServerHandle::noop() };
Self::finalize_rpc_setup(
&mut registry,
&mut modules,
&mut auth_module,
&node,
config,
on_rpc_started,
handles,
)?;
Ok(RpcServerOnlyHandle {
rpc_server_handle,
rpc_registry: registry,
engine_events,
engine_handle,
})
}
/// Launches the RPC servers with the given context and an additional hook for extending
/// modules.
pub async fn launch_add_ons_with<F>(
@@ -469,11 +577,59 @@ where
ext: F,
) -> eyre::Result<RpcHandle<N, EthB::EthApi>>
where
F: FnOnce(
&mut TransportRpcModules,
&mut AuthRpcModule,
&mut RpcRegistry<N, EthB::EthApi>,
) -> eyre::Result<()>,
F: FnOnce(RpcModuleContainer<'_, N, EthB::EthApi>) -> eyre::Result<()>,
{
let setup_ctx = self.setup_rpc_components(ctx, ext).await?;
let RpcSetupContext {
node,
config,
mut modules,
mut auth_module,
auth_config,
mut registry,
on_rpc_started,
engine_events,
engine_handle,
} = setup_ctx;
let server_config = config.rpc.rpc_server_config();
let auth_module_clone = auth_module.clone();
// launch servers concurrently
let (rpc, auth) = futures::future::try_join(
Self::launch_rpc_server_internal(server_config, &modules),
Self::launch_auth_server_internal(auth_module_clone, auth_config),
)
.await?;
let handles = RethRpcServerHandles { rpc, auth };
Self::finalize_rpc_setup(
&mut registry,
&mut modules,
&mut auth_module,
&node,
config,
on_rpc_started,
handles.clone(),
)?;
Ok(RpcHandle {
rpc_server_handles: handles,
rpc_registry: registry,
engine_events,
beacon_engine_handle: engine_handle,
})
}
/// Common setup for RPC server initialization
async fn setup_rpc_components<'a, F>(
self,
ctx: AddOnsContext<'a, N>,
ext: F,
) -> eyre::Result<RpcSetupContext<'a, N, EthB::EthApi>>
where
F: FnOnce(RpcModuleContainer<'_, N, EthB::EthApi>) -> eyre::Result<()>,
{
let Self { eth_api_builder, engine_api_builder, hooks, .. } = self;
@@ -529,56 +685,79 @@ where
let RpcHooks { on_rpc_started, extend_rpc_modules } = hooks;
ext(ctx.modules, ctx.auth_module, ctx.registry)?;
ext(RpcModuleContainer {
modules: ctx.modules,
auth_module: ctx.auth_module,
registry: ctx.registry,
})?;
extend_rpc_modules.extend_rpc_modules(ctx)?;
let server_config = config.rpc.rpc_server_config();
let cloned_modules = modules.clone();
let launch_rpc = server_config.start(&cloned_modules).map_ok(|handle| {
if let Some(path) = handle.ipc_endpoint() {
info!(target: "reth::cli", %path, "RPC IPC server started");
}
if let Some(addr) = handle.http_local_addr() {
info!(target: "reth::cli", url=%addr, "RPC HTTP server started");
}
if let Some(addr) = handle.ws_local_addr() {
info!(target: "reth::cli", url=%addr, "RPC WS server started");
}
handle
});
let launch_auth = auth_module.clone().start_server(auth_config).map_ok(|handle| {
let addr = handle.local_addr();
if let Some(ipc_endpoint) = handle.ipc_endpoint() {
info!(target: "reth::cli", url=%addr, ipc_endpoint=%ipc_endpoint, "RPC auth server started");
} else {
info!(target: "reth::cli", url=%addr, "RPC auth server started");
}
handle
});
// launch servers concurrently
let (rpc, auth) = futures::future::try_join(launch_rpc, launch_auth).await?;
let handles = RethRpcServerHandles { rpc, auth };
let ctx = RpcContext {
node: node.clone(),
Ok(RpcSetupContext {
node,
config,
registry: &mut registry,
modules: &mut modules,
auth_module: &mut auth_module,
};
on_rpc_started.on_rpc_started(ctx, handles.clone())?;
Ok(RpcHandle {
rpc_server_handles: handles,
rpc_registry: registry,
modules,
auth_module,
auth_config,
registry,
on_rpc_started,
engine_events,
beacon_engine_handle,
engine_handle: beacon_engine_handle,
})
}
/// Helper to launch the RPC server
async fn launch_rpc_server_internal(
server_config: RpcServerConfig,
modules: &TransportRpcModules,
) -> eyre::Result<RpcServerHandle> {
let handle = server_config.start(modules).await?;
if let Some(path) = handle.ipc_endpoint() {
info!(target: "reth::cli", %path, "RPC IPC server started");
}
if let Some(addr) = handle.http_local_addr() {
info!(target: "reth::cli", url=%addr, "RPC HTTP server started");
}
if let Some(addr) = handle.ws_local_addr() {
info!(target: "reth::cli", url=%addr, "RPC WS server started");
}
Ok(handle)
}
/// Helper to launch the auth server
async fn launch_auth_server_internal(
auth_module: AuthRpcModule,
auth_config: reth_rpc_builder::auth::AuthServerConfig,
) -> eyre::Result<AuthServerHandle> {
auth_module.start_server(auth_config)
.await
.map_err(Into::into)
.inspect(|handle| {
let addr = handle.local_addr();
if let Some(ipc_endpoint) = handle.ipc_endpoint() {
info!(target: "reth::cli", url=%addr, ipc_endpoint=%ipc_endpoint, "RPC auth server started");
} else {
info!(target: "reth::cli", url=%addr, "RPC auth server started");
}
})
}
/// Helper to finalize RPC setup by creating context and calling hooks
fn finalize_rpc_setup(
registry: &mut RpcRegistry<N, EthB::EthApi>,
modules: &mut TransportRpcModules,
auth_module: &mut AuthRpcModule,
node: &N,
config: &NodeConfig<<N::Types as NodeTypes>::ChainSpec>,
on_rpc_started: Box<dyn OnRpcStarted<N, EthB::EthApi>>,
handles: RethRpcServerHandles,
) -> eyre::Result<()> {
let ctx = RpcContext { node: node.clone(), config, registry, modules, auth_module };
on_rpc_started.on_rpc_started(ctx, handles)?;
Ok(())
}
}
impl<N, EthB, EV, EB> NodeAddOns<N> for RpcAddOns<N, EthB, EV, EB>
@@ -592,7 +771,7 @@ where
type Handle = RpcHandle<N, EthB::EthApi>;
async fn launch_add_ons(self, ctx: AddOnsContext<'_, N>) -> eyre::Result<Self::Handle> {
self.launch_add_ons_with(ctx, |_, _, _| Ok(())).await
self.launch_add_ons_with(ctx, |_| Ok(())).await
}
}