From f1fc9ecafb2a9e9523aacc3626743e3f1410dfb6 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Sun, 16 Apr 2023 22:49:39 +0200 Subject: [PATCH] chore(rpc): rpc and auth server cleanup (#2272) --- bin/reth/src/args/rpc_server_args.rs | 79 +++++++++++++++-- bin/reth/src/node/mod.rs | 27 ++---- crates/rpc/rpc-builder/src/auth.rs | 119 +++++++++++++++++++++++--- crates/rpc/rpc-builder/src/lib.rs | 123 ++++++++++++++++++++++++++- crates/rpc/rpc-engine-api/src/lib.rs | 3 + 5 files changed, 309 insertions(+), 42 deletions(-) diff --git a/bin/reth/src/args/rpc_server_args.rs b/bin/reth/src/args/rpc_server_args.rs index c63bf111ad..e3996e26e2 100644 --- a/bin/reth/src/args/rpc_server_args.rs +++ b/bin/reth/src/args/rpc_server_args.rs @@ -2,24 +2,26 @@ use crate::dirs::{JwtSecretPath, PlatformPath}; use clap::Args; +use futures::FutureExt; use jsonrpsee::server::ServerHandle; use reth_network_api::{NetworkInfo, Peers}; - use reth_provider::{ BlockProvider, CanonStateSubscriptions, EvmEnvProvider, HeaderProvider, StateProviderFactory, }; use reth_rpc::{JwtError, JwtSecret}; use reth_rpc_builder::{ - constants, error::RpcError, IpcServerBuilder, RethRpcModule, RpcModuleSelection, - RpcServerConfig, RpcServerHandle, ServerBuilder, TransportRpcModuleConfig, + auth::AuthServerConfig, constants, error::RpcError, IpcServerBuilder, RethRpcModule, + RpcModuleBuilder, RpcModuleSelection, RpcServerConfig, RpcServerHandle, ServerBuilder, + TransportRpcModuleConfig, }; -use reth_rpc_engine_api::EngineApi; +use reth_rpc_engine_api::{EngineApi, EngineApiServer}; use reth_tasks::TaskSpawner; use reth_transaction_pool::TransactionPool; use std::{ net::{IpAddr, Ipv4Addr, SocketAddr}, path::Path, }; +use tracing::info; /// Parameters for configuring the rpc more granularity via CLI #[derive(Debug, Args, PartialEq, Default)] @@ -114,8 +116,62 @@ impl RpcServerArgs { } } + /// Configures and launches _all_ servers. + /// + /// Returns the handles for the launched regular RPC server(s) (if any) and the server handle + /// for the auth server that handles the `engine_` API that's accessed by the consensus + /// layer. + pub async fn start_servers( + &self, + client: Client, + pool: Pool, + network: Network, + executor: Tasks, + events: Events, + engine_api: Engine, + ) -> Result<(RpcServerHandle, ServerHandle), RpcError> + where + Client: BlockProvider + + HeaderProvider + + StateProviderFactory + + EvmEnvProvider + + Clone + + Unpin + + 'static, + Pool: TransactionPool + Clone + 'static, + Network: NetworkInfo + Peers + Clone + 'static, + Tasks: TaskSpawner + Clone + 'static, + Events: CanonStateSubscriptions + Clone + 'static, + Engine: EngineApiServer, + { + let auth_config = self.auth_server_config()?; + + let (rpc_modules, auth_module) = RpcModuleBuilder::default() + .with_client(client) + .with_pool(pool) + .with_network(network) + .with_events(events) + .with_executor(executor) + .build_with_auth_server(self.transport_rpc_module_config(), engine_api); + + let server_config = self.rpc_server_config(); + let has_server = server_config.has_server(); + let launch_rpc = rpc_modules.start_server(server_config).inspect(|_| { + if has_server { + info!(target: "reth::cli", "Started RPC server"); + } + }); + + let launch_auth = auth_module.start_server(auth_config).inspect(|_| { + info!(target: "reth::cli", "Started Auth server"); + }); + + // launch servers concurrently + futures::future::try_join(launch_rpc, launch_auth).await + } + /// Convenience function for starting a rpc server with configs which extracted from cli args. - pub(crate) async fn start_rpc_server( + pub async fn start_rpc_server( &self, client: Client, pool: Pool, @@ -149,7 +205,7 @@ impl RpcServerArgs { } /// Create Engine API server. - pub(crate) async fn start_auth_server( + pub async fn start_auth_server( &self, client: Client, pool: Pool, @@ -235,6 +291,17 @@ impl RpcServerArgs { config } + + /// Creates the [AuthServerConfig] from cli args. + fn auth_server_config(&self) -> Result { + let secret = self.jwt_secret().map_err(|err| RpcError::Custom(err.to_string()))?; + let address = SocketAddr::new( + self.auth_addr.unwrap_or(IpAddr::V4(Ipv4Addr::UNSPECIFIED)), + self.auth_port.unwrap_or(constants::DEFAULT_AUTH_PORT), + ); + + Ok(AuthServerConfig::builder(secret).socket_addr(address).build()) + } } #[cfg(test)] diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 969675148a..57e43d5de3 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -11,7 +11,7 @@ use crate::{ use clap::{crate_version, Parser}; use eyre::Context; use fdlimit::raise_fd_limit; -use futures::{pin_mut, stream::select as stream_select, FutureExt, StreamExt}; +use futures::{pin_mut, stream::select as stream_select, StreamExt}; use reth_auto_seal_consensus::{AutoSealBuilder, AutoSealConsensus}; use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGeneratorConfig}; use reth_beacon_consensus::{BeaconConsensus, BeaconConsensusEngine, BeaconEngineMessage}; @@ -317,35 +317,18 @@ impl Command { ); info!(target: "reth::cli", "Engine API handler initialized"); - let launch_rpc = self + // Start RPC servers + let (_rpc_server, _auth_server) = self .rpc - .start_rpc_server( + .start_servers( shareable_db.clone(), transaction_pool.clone(), network.clone(), ctx.task_executor.clone(), blockchain_tree, - ) - .inspect(|_| { - info!(target: "reth::cli", "Started RPC server"); - }); - - let launch_auth = self - .rpc - .start_auth_server( - shareable_db.clone(), - transaction_pool.clone(), - network.clone(), - ctx.task_executor.clone(), engine_api, ) - .inspect(|_| { - info!(target: "reth::cli", "Started Auth server"); - }); - - // launch servers - let (_rpc_server, _auth_server) = - futures::future::try_join(launch_rpc, launch_auth).await?; + .await?; // Run consensus engine to completion let (rx, tx) = oneshot::channel(); diff --git a/crates/rpc/rpc-builder/src/auth.rs b/crates/rpc/rpc-builder/src/auth.rs index 9c723f30fd..8f4238b173 100644 --- a/crates/rpc/rpc-builder/src/auth.rs +++ b/crates/rpc/rpc-builder/src/auth.rs @@ -1,25 +1,27 @@ -use crate::error::{RpcError, ServerKind}; +use crate::{ + constants, + error::{RpcError, ServerKind}, +}; pub use jsonrpsee::server::ServerBuilder; use jsonrpsee::server::{RpcModule, ServerHandle}; use reth_network_api::{NetworkInfo, Peers}; - use reth_provider::{BlockProvider, EvmEnvProvider, HeaderProvider, StateProviderFactory}; use reth_rpc::{ - eth::cache::EthStateCache, AuthLayer, EngineApi, EthApi, EthFilter, JwtAuthValidator, JwtSecret, + eth::cache::EthStateCache, AuthLayer, EthApi, EthFilter, JwtAuthValidator, JwtSecret, }; -use reth_rpc_api::servers::*; +use reth_rpc_api::{servers::*, EngineApiServer}; use reth_tasks::TaskSpawner; use reth_transaction_pool::TransactionPool; -use std::net::SocketAddr; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; -/// Configure and launch an auth server with `engine` and a _new_ `eth` namespace. +/// Configure and launch a _standalone_ auth server with `engine` and a _new_ `eth` namespace. #[allow(clippy::too_many_arguments)] -pub async fn launch( +pub async fn launch( client: Client, pool: Pool, network: Network, executor: Tasks, - engine_api: EngineApi, + engine_api: EngineApi, socket_addr: SocketAddr, secret: JwtSecret, ) -> Result @@ -34,6 +36,7 @@ where Pool: TransactionPool + Clone + 'static, Network: NetworkInfo + Peers + Clone + 'static, Tasks: TaskSpawner + Clone + 'static, + EngineApi: EngineApiServer, { // spawn a new cache task let eth_cache = EthStateCache::spawn_with(client.clone(), Default::default(), executor); @@ -42,11 +45,11 @@ where launch_with_eth_api(eth_api, eth_filter, engine_api, socket_addr, secret).await } -/// Configure and launch an auth server with existing EthApi implementation. -pub async fn launch_with_eth_api( +/// Configure and launch a _standalone_ auth server with existing EthApi implementation. +pub async fn launch_with_eth_api( eth_api: EthApi, eth_filter: EthFilter, - engine_api: EngineApi, + engine_api: EngineApi, socket_addr: SocketAddr, secret: JwtSecret, ) -> Result @@ -60,6 +63,7 @@ where + 'static, Pool: TransactionPool + Clone + 'static, Network: NetworkInfo + Peers + Clone + 'static, + EngineApi: EngineApiServer, { // Configure the module and start the server. let mut module = RpcModule::new(()); @@ -80,3 +84,96 @@ where Ok(server.start(module)?) } + +/// Server configuration for the auth server. +#[derive(Clone, Debug)] +pub struct AuthServerConfig { + /// Where the server should listen. + pub(crate) socket_addr: SocketAddr, + /// The secrete for the auth layer of the server. + pub(crate) secret: JwtSecret, +} + +// === impl AuthServerConfig === + +impl AuthServerConfig { + /// Convenience function to create a new `AuthServerConfig`. + pub fn builder(secret: JwtSecret) -> AuthServerConfigBuilder { + AuthServerConfigBuilder::new(secret) + } + + /// Convenience function to start a server in one step. + pub async fn start(self, module: AuthRpcModule) -> Result { + let Self { socket_addr, secret } = self; + + // Create auth middleware. + let middleware = + tower::ServiceBuilder::new().layer(AuthLayer::new(JwtAuthValidator::new(secret))); + + // By default, both http and ws are enabled. + let server = + ServerBuilder::new().set_middleware(middleware).build(socket_addr).await.map_err( + |err| RpcError::from_jsonrpsee_error(err, ServerKind::Auth(socket_addr)), + )?; + + Ok(server.start(module.inner)?) + } +} + +/// Builder type for configuring an `AuthServerConfig`. +#[derive(Clone, Debug)] +pub struct AuthServerConfigBuilder { + socket_addr: Option, + secret: JwtSecret, +} + +// === impl AuthServerConfigBuilder === + +impl AuthServerConfigBuilder { + /// Create a new `AuthServerConfigBuilder` with the given `secret`. + pub fn new(secret: JwtSecret) -> Self { + Self { socket_addr: None, secret } + } + + /// Set the socket address for the server. + pub fn socket_addr(mut self, socket_addr: SocketAddr) -> Self { + self.socket_addr = Some(socket_addr); + self + } + + /// Set the socket address for the server. + pub fn maybe_socket_addr(mut self, socket_addr: Option) -> Self { + self.socket_addr = socket_addr; + self + } + + /// Set the secret for the server. + pub fn secret(mut self, secret: JwtSecret) -> Self { + self.secret = secret; + self + } + /// Build the `AuthServerConfig`. + pub fn build(self) -> AuthServerConfig { + AuthServerConfig { + socket_addr: self.socket_addr.unwrap_or_else(|| { + SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), constants::DEFAULT_AUTH_PORT) + }), + secret: self.secret, + } + } +} + +/// Holds installed modules for the auth server. +#[derive(Debug)] +pub struct AuthRpcModule { + pub(crate) inner: RpcModule<()>, +} + +// === impl TransportRpcModules === + +impl AuthRpcModule { + /// Convenience function for starting a server + pub async fn start_server(self, config: AuthServerConfig) -> Result { + config.start(self).await + } +} diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index a878019750..51eda8a381 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -51,6 +51,51 @@ //! .unwrap(); //! } //! ``` +//! +//! Configure a http and ws server with a separate auth server that handles the `engine_` API +//! +//! +//! ``` +//! use tokio::try_join; +//! use reth_network_api::{NetworkInfo, Peers}; +//! use reth_provider::{BlockProvider, CanonStateSubscriptions, StateProviderFactory, EvmEnvProvider}; +//! use reth_rpc::JwtSecret; +//! use reth_rpc_builder::{RethRpcModule, RpcModuleBuilder, RpcServerConfig, TransportRpcModuleConfig}; +//! use reth_tasks::TokioTaskExecutor; +//! use reth_transaction_pool::TransactionPool; +//! use reth_rpc_api::EngineApiServer; +//! use reth_rpc_builder::auth::AuthServerConfig; +//! pub async fn launch(client: Client, pool: Pool, network: Network, events: Events, engine_api: EngineApi) +//! where +//! Client: BlockProvider + StateProviderFactory + EvmEnvProvider + Clone + Unpin + 'static, +//! Pool: TransactionPool + Clone + 'static, +//! Network: NetworkInfo + Peers + Clone + 'static, +//! Events: CanonStateSubscriptions + Clone + 'static, +//! EngineApi: EngineApiServer +//! { +//! // configure the rpc module per transport +//! let transports = TransportRpcModuleConfig::default().with_http(vec![ +//! RethRpcModule::Admin, +//! RethRpcModule::Debug, +//! RethRpcModule::Eth, +//! RethRpcModule::Web3, +//! ]); +//! let builder = RpcModuleBuilder::new(client, pool, network, TokioTaskExecutor::default(), events); +//! +//! // configure the server modules +//! let (modules, auth_module) = builder.build_with_auth_server(transports, engine_api); +//! +//! // start the servers +//! let auth_config = AuthServerConfig::builder(JwtSecret::random()).build(); +//! let config = RpcServerConfig::default(); +//! +//! let (_rpc_handle, _auth_handle) = try_join!( +//! modules.start_server(config), +//! auth_module.start_server(auth_config), +//! ).unwrap(); +//! +//! } +//! ``` use constants::*; use error::{RpcError, ServerKind}; @@ -66,7 +111,7 @@ use reth_rpc::{ eth::cache::EthStateCache, AdminApi, DebugApi, EthApi, EthFilter, EthPubSub, EthSubscriptionIdProvider, NetApi, TraceApi, TracingCallGuard, Web3Api, }; -use reth_rpc_api::servers::*; +use reth_rpc_api::{servers::*, EngineApiServer}; use reth_tasks::TaskSpawner; use reth_transaction_pool::TransactionPool; use serde::{Deserialize, Serialize, Serializer}; @@ -77,7 +122,6 @@ use std::{ str::FromStr, }; use strum::{AsRefStr, EnumString, EnumVariantNames, ParseError, VariantNames}; - use tower::layer::util::{Identity, Stack}; use tower_http::cors::CorsLayer; use tracing::{instrument, trace}; @@ -98,6 +142,7 @@ mod eth; pub mod constants; // re-export for convenience +use crate::auth::AuthRpcModule; pub use crate::eth::{EthConfig, EthHandlers}; pub use jsonrpsee::server::ServerBuilder; pub use reth_ipc::server::{Builder as IpcServerBuilder, Endpoint}; @@ -130,7 +175,7 @@ where /// A builder type to configure the RPC module: See [RpcModule] /// /// This is the main entrypoint for up RPC servers. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct RpcModuleBuilder { /// The Client type to when creating all rpc handlers client: Client, @@ -212,6 +257,42 @@ where Tasks: TaskSpawner + Clone + 'static, Events: CanonStateSubscriptions + Clone + 'static, { + /// Configures all [RpcModule]s specific to the given [TransportRpcModuleConfig] which can be + /// used to start the transport server(s). + /// + /// And also configures the auth server, which also exposes the `eth_` namespace. + pub fn build_with_auth_server( + self, + module_config: TransportRpcModuleConfig, + engine: EngineApi, + ) -> (TransportRpcModules<()>, AuthRpcModule) + where + EngineApi: EngineApiServer, + { + let mut modules = TransportRpcModules::default(); + + let Self { client, pool, network, executor, events } = self; + + let TransportRpcModuleConfig { http, ws, ipc, config } = module_config; + + let mut registry = RethModuleRegistry::new( + client, + pool, + network, + executor, + events, + config.unwrap_or_default(), + ); + + modules.http = registry.maybe_module(http.as_ref()); + modules.ws = registry.maybe_module(ws.as_ref()); + modules.ipc = registry.maybe_module(ipc.as_ref()); + + let auth_module = registry.create_auth_module(engine); + + (modules, auth_module) + } + /// Configures all [RpcModule]s specific to the given [TransportRpcModuleConfig] which can be /// used to start the transport server(s). /// @@ -548,6 +629,28 @@ where self } + /// Configures the auth module that includes the + /// * `engine_` namespace + /// * `api_` namespace + /// + /// Note: This does _not_ register the `engine_` in this registry. + pub fn create_auth_module(&mut self, engine_api: EngineApi) -> AuthRpcModule + where + EngineApi: EngineApiServer, + { + let eth_handlers = self.eth_handlers(); + let mut module = RpcModule::new(()); + + module.merge(engine_api.into_rpc()).expect("No conflicting methods"); + + // also merge all `eth_` handlers + module.merge(eth_handlers.api.into_rpc()).expect("No conflicting methods"); + module.merge(eth_handlers.filter.into_rpc()).expect("No conflicting methods"); + module.merge(eth_handlers.pubsub.into_rpc()).expect("No conflicting methods"); + + AuthRpcModule { inner: module } + } + /// Register Net Namespace pub fn register_net(&mut self) -> &mut Self { let eth_api = self.eth_api(); @@ -668,6 +771,11 @@ where f(self.eth.as_ref().expect("exists; qed")) } + /// Returns the configured [EthHandlers] or creates it if it does not exist yet + fn eth_handlers(&mut self) -> EthHandlers { + self.with_eth(|handlers| handlers.clone()) + } + /// Returns the configured [EthApi] or creates it if it does not exist yet fn eth_api(&mut self) -> EthApi { self.with_eth(|handlers| handlers.api.clone()) @@ -827,6 +935,15 @@ impl RpcServerConfig { self } + /// Returns true if any server is configured. + /// + /// If no server is configured, no server will be be launched on [RpcServerConfig::start]. + pub fn has_server(&self) -> bool { + self.http_server_config.is_some() || + self.ws_server_config.is_some() || + self.ipc_server_config.is_some() + } + /// Returns the [SocketAddr] of the http server pub fn http_address(&self) -> Option { self.http_addr diff --git a/crates/rpc/rpc-engine-api/src/lib.rs b/crates/rpc/rpc-engine-api/src/lib.rs index 49d08e4eb3..2a68d3d12e 100644 --- a/crates/rpc/rpc-engine-api/src/lib.rs +++ b/crates/rpc/rpc-engine-api/src/lib.rs @@ -20,3 +20,6 @@ mod error; pub use engine_api::{EngineApi, EngineApiSender}; pub use error::*; pub use message::EngineApiMessageVersion; + +// re-export server trait for convenience +pub use reth_rpc_api::EngineApiServer;