diff --git a/bin/reth/src/args/rpc_server_args.rs b/bin/reth/src/args/rpc_server_args.rs index 35d9e79459..13e9f20167 100644 --- a/bin/reth/src/args/rpc_server_args.rs +++ b/bin/reth/src/args/rpc_server_args.rs @@ -1,6 +1,9 @@ //! clap [Args](clap::Args) for RPC related arguments. -use crate::args::GasPriceOracleArgs; +use crate::{ + args::GasPriceOracleArgs, + cli::ext::{NoopArgsExt, RethRpcConfig, RethRpcServerArgsExt}, +}; use clap::{ builder::{PossibleValue, RangedU64ValueParser, TypedValueParser}, Arg, Args, Command, @@ -52,9 +55,9 @@ pub(crate) const RPC_DEFAULT_MAX_CONNECTIONS: u32 = 100; pub(crate) const RPC_DEFAULT_MAX_TRACING_REQUESTS: u32 = 25; /// Parameters for configuring the rpc more granularity via CLI -#[derive(Debug, Args, PartialEq, Eq)] +#[derive(Debug, Args)] #[command(next_help_heading = "RPC")] -pub struct RpcServerArgs { +pub struct RpcServerArgs { /// Enable the HTTP-RPC server #[arg(long, default_value_if("dev", "true", "true"))] pub http: bool, @@ -160,9 +163,13 @@ pub struct RpcServerArgs { /// Maximum number of env cache entries. #[arg(long, default_value_t = DEFAULT_ENV_CACHE_MAX_LEN)] pub env_cache_len: u32, + + /// Additional arguments for rpc. + #[clap(flatten)] + pub ext: Ext, } -impl RpcServerArgs { +impl RpcServerArgs { /// Returns the max request size in bytes. pub fn rpc_max_request_size_bytes(&self) -> u32 { self.rpc_max_request_size * 1024 * 1024 @@ -183,21 +190,6 @@ impl RpcServerArgs { ) } - /// Extracts the [EthConfig] from the args. - pub fn eth_config(&self) -> EthConfig { - EthConfig::default() - .max_tracing_requests(self.rpc_max_tracing_requests) - .rpc_gas_cap(self.rpc_gas_cap) - .gpo_config(self.gas_price_oracle_config()) - } - - /// Convenience function that returns whether ipc is enabled - /// - /// By default IPC is enabled therefor it is enabled if the `ipcdisable` is false. - fn is_ipc_enabled(&self) -> bool { - !self.ipcdisable - } - /// The execution layer and consensus layer clients SHOULD accept a configuration parameter: /// jwt-secret, which designates a file containing the hex-encoded 256 bit secret key to be used /// for verifying/generating JWT tokens. @@ -244,7 +236,7 @@ impl RpcServerArgs { events: Events, engine_api: Engine, jwt_secret: JwtSecret, - ) -> Result<(RpcServerHandle, AuthServerHandle), RpcError> + ) -> eyre::Result<(RpcServerHandle, AuthServerHandle)> where Provider: BlockReaderIdExt + HeaderProvider @@ -266,7 +258,7 @@ impl RpcServerArgs { let module_config = self.transport_rpc_module_config(); debug!(target: "reth::cli", http=?module_config.http(), ws=?module_config.ws(), "Using RPC module config"); - let (rpc_modules, auth_module) = RpcModuleBuilder::default() + let (mut rpc_modules, auth_module, mut registry) = RpcModuleBuilder::default() .with_provider(provider) .with_pool(pool) .with_network(network) @@ -274,6 +266,9 @@ impl RpcServerArgs { .with_executor(executor) .build_with_auth_server(module_config, engine_api); + // apply configured customization + self.ext.extend_rpc_modules(self, &mut registry, &mut rpc_modules)?; + let server_config = self.rpc_server_config(); let launch_rpc = rpc_modules.start_server(server_config).map_ok(|handle| { if let Some(url) = handle.ipc_endpoint() { @@ -295,7 +290,7 @@ impl RpcServerArgs { }); // launch servers concurrently - futures::future::try_join(launch_rpc, launch_auth).await + Ok(futures::future::try_join(launch_rpc, launch_auth).await?) } /// Convenience function for starting a rpc server with configs which extracted from cli args. @@ -454,6 +449,20 @@ impl RpcServerArgs { } } +impl RethRpcConfig for RpcServerArgs { + fn is_ipc_enabled(&self) -> bool { + // By default IPC is enabled therefor it is enabled if the `ipcdisable` is false. + !self.ipcdisable + } + + fn eth_config(&self) -> EthConfig { + EthConfig::default() + .max_tracing_requests(self.rpc_max_tracing_requests) + .rpc_gas_cap(self.rpc_gas_cap) + .gpo_config(self.gas_price_oracle_config()) + } +} + /// clap value parser for [RpcModuleSelection]. #[derive(Clone, Debug, Default)] #[non_exhaustive] diff --git a/bin/reth/src/cli/ext.rs b/bin/reth/src/cli/ext.rs new file mode 100644 index 0000000000..11db2ca347 --- /dev/null +++ b/bin/reth/src/cli/ext.rs @@ -0,0 +1,94 @@ +//! Support for integrating customizations into the CLI. + +use clap::Args; +use reth_network_api::{NetworkInfo, Peers}; +use reth_provider::{ + BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, EvmEnvProvider, + StateProviderFactory, +}; +use reth_rpc_builder::{EthConfig, RethModuleRegistry, TransportRpcModules}; +use reth_tasks::TaskSpawner; +use reth_transaction_pool::TransactionPool; +use std::fmt; + +/// A trait that allows for extending parts of the CLI with additional functionality. +pub trait RethCliExt { + /// Extends the rpc arguments for the node + type RpcExt: RethRpcServerArgsExt; +} + +impl RethCliExt for () { + type RpcExt = NoopArgsExt; +} + +/// An [Args] extension that does nothing. +#[derive(Debug, Clone, Copy, Default, Args)] +pub struct NoopArgsExt; + +/// A trait that provides configured RPC server. +/// +/// This provides all basic config values for the RPC server and is implemented by the +/// [RpcServerArgs](crate::args::RpcServerArgs) type. +pub trait RethRpcConfig { + /// Returns whether ipc is enabled. + fn is_ipc_enabled(&self) -> bool; + + /// The configured ethereum RPC settings. + fn eth_config(&self) -> EthConfig; + + // TODO extract more functions from RpcServerArgs +} + +/// A trait that allows further customization of the RPC server via CLI. +pub trait RethRpcServerArgsExt: fmt::Debug + clap::Args { + /// Allows for registering additional RPC modules for the transports. + /// + /// This is expected to call the merge functions of [TransportRpcModules], for example + /// [TransportRpcModules::merge_configured] + fn extend_rpc_modules( + &self, + config: &Conf, + registry: &mut RethModuleRegistry, + modules: &mut TransportRpcModules<()>, + ) -> eyre::Result<()> + where + Conf: RethRpcConfig, + Provider: BlockReaderIdExt + + StateProviderFactory + + EvmEnvProvider + + ChainSpecProvider + + ChangeSetReader + + Clone + + Unpin + + 'static, + Pool: TransactionPool + Clone + 'static, + Network: NetworkInfo + Peers + Clone + 'static, + Tasks: TaskSpawner + Clone + 'static, + Events: CanonStateSubscriptions + Clone + 'static; +} + +impl RethRpcServerArgsExt for NoopArgsExt { + fn extend_rpc_modules( + &self, + _config: &Conf, + _registry: &mut RethModuleRegistry, + _modules: &mut TransportRpcModules<()>, + ) -> eyre::Result<()> + where + Conf: RethRpcConfig, + Provider: BlockReaderIdExt + + StateProviderFactory + + EvmEnvProvider + + ChainSpecProvider + + ChangeSetReader + + Clone + + Unpin + + 'static, + Pool: TransactionPool + Clone + 'static, + Network: NetworkInfo + Peers + Clone + 'static, + Tasks: TaskSpawner + Clone + 'static, + Events: CanonStateSubscriptions + Clone + 'static, + { + Ok(()) + } +} diff --git a/bin/reth/src/cli.rs b/bin/reth/src/cli/mod.rs similarity index 93% rename from bin/reth/src/cli.rs rename to bin/reth/src/cli/mod.rs index 9423235ebb..79a1f7b924 100644 --- a/bin/reth/src/cli.rs +++ b/bin/reth/src/cli/mod.rs @@ -1,7 +1,9 @@ //! CLI definition and entrypoint to executable use crate::{ args::utils::genesis_value_parser, - chain, config, db, debug_cmd, + chain, + cli::ext::RethCliExt, + config, db, debug_cmd, dirs::{LogsDir, PlatformPath}, node, p2p, runner::CliRunner, @@ -17,15 +19,17 @@ use reth_tracing::{ }; use std::sync::Arc; +pub mod ext; + /// The main reth cli interface. /// /// This is the entrypoint to the executable. #[derive(Debug, Parser)] #[command(author, version = SHORT_VERSION, long_version = LONG_VERSION, about = "Reth", long_about = None)] -pub struct Cli { +pub struct Cli { /// The command to run #[clap(subcommand)] - command: Commands, + command: Commands, /// The chain this node is running. /// @@ -99,10 +103,10 @@ pub fn run() -> eyre::Result<()> { /// Commands to be executed #[derive(Debug, Subcommand)] -pub enum Commands { +pub enum Commands { /// Start the node #[command(name = "node")] - Node(node::Command), + Node(node::Command), /// Initialize the database from a genesis file. #[command(name = "init")] Init(chain::InitCommand), @@ -225,9 +229,9 @@ mod tests { /// runtime #[test] fn test_parse_help_all_subcommands() { - let reth = Cli::command(); + let reth = Cli::<()>::command(); for sub_command in reth.get_subcommands() { - let err = Cli::try_parse_from(["reth", sub_command.get_name(), "--help"]) + let err = Cli::<()>::try_parse_from(["reth", sub_command.get_name(), "--help"]) .err() .unwrap_or_else(|| { panic!("Failed to parse help message {}", sub_command.get_name()) @@ -243,13 +247,13 @@ mod tests { /// name #[test] fn parse_logs_path() { - let mut reth = Cli::try_parse_from(["reth", "node", "--log.persistent"]).unwrap(); + let mut reth = Cli::<()>::try_parse_from(["reth", "node", "--log.persistent"]).unwrap(); reth.logs.log_directory = reth.logs.log_directory.join(reth.chain.chain.to_string()); let log_dir = reth.logs.log_directory; assert!(log_dir.as_ref().ends_with("reth/logs/mainnet"), "{:?}", log_dir); let mut reth = - Cli::try_parse_from(["reth", "node", "--chain", "sepolia", "--log.persistent"]) + Cli::<()>::try_parse_from(["reth", "node", "--chain", "sepolia", "--log.persistent"]) .unwrap(); reth.logs.log_directory = reth.logs.log_directory.join(reth.chain.chain.to_string()); let log_dir = reth.logs.log_directory; diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index ab6bcb623e..8dc76f0fe2 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -2,9 +2,16 @@ //! //! Starts the client use crate::{ - args::{get_secret_key, DebugArgs, DevArgs, NetworkArgs, RpcServerArgs, TxPoolArgs}, - dirs::DataDirPath, + args::{ + get_secret_key, + utils::{genesis_value_parser, parse_socket_address}, + DatabaseArgs, DebugArgs, DevArgs, NetworkArgs, PayloadBuilderArgs, RpcServerArgs, + TxPoolArgs, + }, + cli::ext::RethCliExt, + dirs::{DataDirPath, MaybePlatformPath}, init::init_genesis, + node::cl_events::ConsensusLayerHealthEvents, prometheus_exporter, runner::CliContext, utils::get_single_header, @@ -32,26 +39,30 @@ use reth_interfaces::{ p2p::{ bodies::{client::BodiesClient, downloader::BodyDownloader}, either::EitherDownloader, - headers::downloader::HeaderDownloader, + headers::{client::HeadersClient, downloader::HeaderDownloader}, }, }; use reth_network::{error::NetworkError, NetworkConfig, NetworkHandle, NetworkManager}; use reth_network_api::NetworkInfo; +use reth_payload_builder::PayloadBuilderService; use reth_primitives::{ - stage::StageId, BlockHashOrNumber, BlockNumber, ChainSpec, Head, SealedHeader, H256, + stage::StageId, BlockHashOrNumber, BlockNumber, ChainSpec, DisplayHardforks, Head, + SealedHeader, H256, }; use reth_provider::{ - BlockHashReader, BlockReader, CanonStateSubscriptions, HeaderProvider, ProviderFactory, - StageCheckpointReader, + providers::BlockchainProvider, BlockHashReader, BlockReader, CanonStateSubscriptions, + HeaderProvider, ProviderFactory, StageCheckpointReader, }; +use reth_prune::BatchSizes; use reth_revm::Factory; use reth_revm_inspectors::stack::Hook; use reth_rpc_engine_api::EngineApi; use reth_stages::{ prelude::*, stages::{ - ExecutionStage, ExecutionStageThresholds, HeaderSyncMode, SenderRecoveryStage, - TotalDifficultyStage, + AccountHashingStage, ExecutionStage, ExecutionStageThresholds, HeaderSyncMode, + IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, + StorageHashingStage, TotalDifficultyStage, TransactionLookupStage, }, MetricEventsSender, MetricsListener, }; @@ -66,30 +77,12 @@ use std::{ use tokio::sync::{mpsc::unbounded_channel, oneshot, watch}; use tracing::*; -use crate::{ - args::{ - utils::{genesis_value_parser, parse_socket_address}, - DatabaseArgs, PayloadBuilderArgs, - }, - dirs::MaybePlatformPath, - node::cl_events::ConsensusLayerHealthEvents, -}; -use reth_interfaces::p2p::headers::client::HeadersClient; -use reth_payload_builder::PayloadBuilderService; -use reth_primitives::DisplayHardforks; -use reth_provider::providers::BlockchainProvider; -use reth_prune::BatchSizes; -use reth_stages::stages::{ - AccountHashingStage, IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, - StorageHashingStage, TransactionLookupStage, -}; - pub mod cl_events; pub mod events; /// Start the node #[derive(Debug, Parser)] -pub struct Command { +pub struct Command { /// The path to the data dir for all reth files and subdirectories. /// /// Defaults to the OS-specific data directory: @@ -134,7 +127,7 @@ pub struct Command { network: NetworkArgs, #[clap(flatten)] - rpc: RpcServerArgs, + rpc: RpcServerArgs, #[clap(flatten)] txpool: TxPoolArgs, @@ -820,60 +813,61 @@ async fn run_network_until_shutdown( #[cfg(test)] mod tests { - use reth_primitives::DEV; - use super::*; + use reth_primitives::DEV; use std::{net::IpAddr, path::Path}; #[test] fn parse_help_node_command() { - let err = Command::try_parse_from(["reth", "--help"]).unwrap_err(); + let err = Command::<()>::try_parse_from(["reth", "--help"]).unwrap_err(); assert_eq!(err.kind(), clap::error::ErrorKind::DisplayHelp); } #[test] fn parse_common_node_command_chain_args() { for chain in ["mainnet", "sepolia", "goerli"] { - let args: Command = Command::parse_from(["reth", "--chain", chain]); + let args: Command = Command::<()>::parse_from(["reth", "--chain", chain]); assert_eq!(args.chain.chain, chain.parse().unwrap()); } } #[test] fn parse_discovery_port() { - let cmd = Command::try_parse_from(["reth", "--discovery.port", "300"]).unwrap(); + let cmd = Command::<()>::try_parse_from(["reth", "--discovery.port", "300"]).unwrap(); assert_eq!(cmd.network.discovery.port, Some(300)); } #[test] fn parse_port() { let cmd = - Command::try_parse_from(["reth", "--discovery.port", "300", "--port", "99"]).unwrap(); + Command::<()>::try_parse_from(["reth", "--discovery.port", "300", "--port", "99"]) + .unwrap(); assert_eq!(cmd.network.discovery.port, Some(300)); assert_eq!(cmd.network.port, Some(99)); } #[test] fn parse_metrics_port() { - let cmd = Command::try_parse_from(["reth", "--metrics", "9001"]).unwrap(); + let cmd = Command::<()>::try_parse_from(["reth", "--metrics", "9001"]).unwrap(); assert_eq!(cmd.metrics, Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9001))); - let cmd = Command::try_parse_from(["reth", "--metrics", ":9001"]).unwrap(); + let cmd = Command::<()>::try_parse_from(["reth", "--metrics", ":9001"]).unwrap(); assert_eq!(cmd.metrics, Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9001))); - let cmd = Command::try_parse_from(["reth", "--metrics", "localhost:9001"]).unwrap(); + let cmd = Command::<()>::try_parse_from(["reth", "--metrics", "localhost:9001"]).unwrap(); assert_eq!(cmd.metrics, Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9001))); } #[test] fn parse_config_path() { - let cmd = Command::try_parse_from(["reth", "--config", "my/path/to/reth.toml"]).unwrap(); + let cmd = + Command::<()>::try_parse_from(["reth", "--config", "my/path/to/reth.toml"]).unwrap(); // always store reth.toml in the data dir, not the chain specific data dir let data_dir = cmd.datadir.unwrap_or_chain_default(cmd.chain.chain); let config_path = cmd.config.unwrap_or(data_dir.config_path()); assert_eq!(config_path, Path::new("my/path/to/reth.toml")); - let cmd = Command::try_parse_from(["reth"]).unwrap(); + let cmd = Command::<()>::try_parse_from(["reth"]).unwrap(); // always store reth.toml in the data dir, not the chain specific data dir let data_dir = cmd.datadir.unwrap_or_chain_default(cmd.chain.chain); @@ -883,12 +877,12 @@ mod tests { #[test] fn parse_db_path() { - let cmd = Command::try_parse_from(["reth"]).unwrap(); + let cmd = Command::<()>::try_parse_from(["reth"]).unwrap(); let data_dir = cmd.datadir.unwrap_or_chain_default(cmd.chain.chain); let db_path = data_dir.db_path(); assert!(db_path.ends_with("reth/mainnet/db"), "{:?}", cmd.config); - let cmd = Command::try_parse_from(["reth", "--datadir", "my/custom/path"]).unwrap(); + let cmd = Command::<()>::try_parse_from(["reth", "--datadir", "my/custom/path"]).unwrap(); let data_dir = cmd.datadir.unwrap_or_chain_default(cmd.chain.chain); let db_path = data_dir.db_path(); assert_eq!(db_path, Path::new("my/custom/path/db")); @@ -896,7 +890,7 @@ mod tests { #[test] fn parse_dev() { - let cmd = Command::parse_from(["reth", "--dev"]); + let cmd = Command::<()>::parse_from(["reth", "--dev"]); let chain = DEV.clone(); assert_eq!(cmd.chain.chain, chain.chain); assert_eq!(cmd.chain.genesis_hash, chain.genesis_hash); diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index 2ec4ea0a33..c66636babb 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -89,7 +89,7 @@ //! let builder = RpcModuleBuilder::new(provider, pool, network, TokioTaskExecutor::default(), events); //! //! // configure the server modules -//! let (modules, auth_module) = builder.build_with_auth_server(transports, engine_api); +//! let (modules, auth_module, _registry) = builder.build_with_auth_server(transports, engine_api); //! //! // start the servers //! let auth_config = AuthServerConfig::builder(JwtSecret::random()).build(); @@ -343,7 +343,11 @@ where self, module_config: TransportRpcModuleConfig, engine: EngineApi, - ) -> (TransportRpcModules<()>, AuthRpcModule) + ) -> ( + TransportRpcModules<()>, + AuthRpcModule, + RethModuleRegistry, + ) where EngineApi: EngineApiServer, { @@ -369,7 +373,7 @@ where let auth_module = registry.create_auth_module(engine); - (modules, auth_module) + (modules, auth_module, registry) } /// Configures all [RpcModule]s specific to the given [TransportRpcModuleConfig] which can be @@ -1026,12 +1030,12 @@ where } /// Returns the configured [EthHandlers] or creates it if it does not exist yet - fn eth_handlers(&mut self) -> EthHandlers { + pub fn eth_handlers(&mut self) -> EthHandlers { self.with_eth(|handlers| handlers.clone()) } /// Returns the configured [EthApi] or creates it if it does not exist yet - fn eth_api(&mut self) -> EthApi { + pub fn eth_api(&mut self) -> EthApi { self.with_eth(|handlers| handlers.api.clone()) } } @@ -1456,6 +1460,65 @@ impl TransportRpcModules<()> { &self.config } + /// Merge the given Methods in the configured http methods. + /// + /// Fails if any of the methods in other is present already. + /// + /// Returns Ok(false) if no http transport is configured. + pub fn merge_http( + &mut self, + other: impl Into, + ) -> Result { + if let Some(ref mut http) = self.http { + return http.merge(other.into()).map(|_| true) + } + Ok(false) + } + + /// Merge the given Methods in the configured ws methods. + /// + /// Fails if any of the methods in other is present already. + /// + /// Returns Ok(false) if no http transport is configured. + pub fn merge_ws( + &mut self, + other: impl Into, + ) -> Result { + if let Some(ref mut ws) = self.ws { + return ws.merge(other.into()).map(|_| true) + } + Ok(false) + } + + /// Merge the given Methods in the configured ipc methods. + /// + /// Fails if any of the methods in other is present already. + /// + /// Returns Ok(false) if no ipc transport is configured. + pub fn merge_ipc( + &mut self, + other: impl Into, + ) -> Result { + if let Some(ref mut http) = self.http { + return http.merge(other.into()).map(|_| true) + } + Ok(false) + } + + /// Merge the given Methods in all configured methods. + /// + /// Fails if any of the methods in other is present already. + pub fn merge_configured( + &mut self, + other: impl Into, + ) -> Result<(), jsonrpsee::core::error::Error> { + let other = other.into(); + self.merge_http(other.clone())?; + self.merge_ws(other.clone())?; + self.merge_ipc(other.clone())?; + Ok(()) + } + /// Convenience function for starting a server pub async fn start_server(self, builder: RpcServerConfig) -> Result { builder.start(self).await