diff --git a/Cargo.lock b/Cargo.lock index d4f952dbb1..6ca5f9d40e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5073,6 +5073,7 @@ version = "0.1.0" dependencies = [ "hyper", "jsonrpsee", + "reth-interfaces", "reth-ipc", "reth-network-api", "reth-primitives", diff --git a/bin/reth/src/args/rpc_server_args.rs b/bin/reth/src/args/rpc_server_args.rs index c86a7f4a0a..9bef1c268e 100644 --- a/bin/reth/src/args/rpc_server_args.rs +++ b/bin/reth/src/args/rpc_server_args.rs @@ -3,6 +3,7 @@ use crate::dirs::{JwtSecretPath, PlatformPath}; use clap::Args; use jsonrpsee::{core::Error as RpcError, server::ServerHandle}; +use reth_interfaces::events::ChainEventSubscriptions; use reth_network_api::{NetworkInfo, Peers}; use reth_provider::{BlockProvider, EvmEnvProvider, HeaderProvider, StateProviderFactory}; use reth_rpc::{JwtError, JwtSecret}; @@ -108,12 +109,13 @@ impl RpcServerArgs { } /// Convenience function for starting a rpc server with configs which extracted from cli args. - pub(crate) async fn start_rpc_server( + pub(crate) async fn start_rpc_server( &self, client: Client, pool: Pool, network: Network, executor: Tasks, + events: Events, ) -> Result where Client: BlockProvider @@ -126,6 +128,7 @@ impl RpcServerArgs { Pool: TransactionPool + Clone + 'static, Network: NetworkInfo + Peers + Clone + 'static, Tasks: TaskSpawner + Clone + 'static, + Events: ChainEventSubscriptions + Clone + 'static, { reth_rpc_builder::launch( client, @@ -134,6 +137,7 @@ impl RpcServerArgs { self.transport_rpc_module_config(), self.rpc_server_config(), executor, + events, ) .await } diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 889646a9cf..63b2270dc6 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -32,6 +32,7 @@ use reth_interfaces::{ headers::{client::StatusUpdater, downloader::HeaderDownloader}, }, sync::SyncStateUpdater, + test_utils::TestChainEventSubscriptions, }; use reth_network::{ error::NetworkError, FetchClient, NetworkConfig, NetworkHandle, NetworkManager, @@ -208,6 +209,8 @@ impl Command { transaction_pool.clone(), network.clone(), ctx.task_executor.clone(), + // TODO use real implementation + TestChainEventSubscriptions::default(), ) .await?; info!(target: "reth::cli", "Started RPC server"); diff --git a/crates/rpc/rpc-builder/Cargo.toml b/crates/rpc/rpc-builder/Cargo.toml index 2730dd1e6f..b11b197fe7 100644 --- a/crates/rpc/rpc-builder/Cargo.toml +++ b/crates/rpc/rpc-builder/Cargo.toml @@ -9,6 +9,7 @@ description = "Helpers for configuring RPC" [dependencies] # reth reth-ipc = { path = "../ipc" } +reth-interfaces = { path = "../../interfaces" } reth-network-api = { path = "../../net/network-api" } reth-provider = { path = "../../storage/provider" } reth-rpc = { path = "../rpc" } @@ -37,6 +38,7 @@ reth-rpc-api = { path = "../rpc-api", features = ["client"] } reth-transaction-pool = { path = "../../transaction-pool", features = ["test-utils"] } reth-provider = { path = "../../storage/provider", features = ["test-utils"] } reth-network-api = { path = "../../net/network-api", features = ["test-utils"] } +reth-interfaces = { path = "../../interfaces", features = ["test-utils"] } tokio = { version = "1", features = ["rt", "rt-multi-thread"] } serde_json = "1.0.94" diff --git a/crates/rpc/rpc-builder/src/eth.rs b/crates/rpc/rpc-builder/src/eth.rs index eec235cdd8..97546de877 100644 --- a/crates/rpc/rpc-builder/src/eth.rs +++ b/crates/rpc/rpc-builder/src/eth.rs @@ -10,11 +10,11 @@ pub struct EthHandlers { /// Main `eth_` request handler pub api: EthApi, /// The async caching layer used by the eth handlers - pub eth_cache: EthStateCache, + pub cache: EthStateCache, /// Polling based filter handler available on all transports pub filter: EthFilter, /// Handler for subscriptions only available for transports that support it (ws, ipc) - pub pubsub: Option>, + pub pubsub: EthPubSub, } /// Additional config values for the eth namespace diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index b529bbac71..d8c16161f0 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -24,16 +24,18 @@ //! Configure only a http server with a selection of [RethRpcModule]s //! //! ``` +//! use reth_interfaces::events::ChainEventSubscriptions; //! use reth_network_api::{NetworkInfo, Peers}; //! use reth_provider::{BlockProvider, StateProviderFactory, EvmEnvProvider}; //! use reth_rpc_builder::{RethRpcModule, RpcModuleBuilder, RpcServerConfig, ServerBuilder, TransportRpcModuleConfig}; //! use reth_tasks::TokioTaskExecutor; //! use reth_transaction_pool::TransactionPool; -//! pub async fn launch(client: Client, pool: Pool, network: Network) +//! pub async fn launch(client: Client, pool: Pool, network: Network, events: Events) //! where //! Client: BlockProvider + StateProviderFactory + EvmEnvProvider + Clone + Unpin + 'static, //! Pool: TransactionPool + Clone + 'static, //! Network: NetworkInfo + Peers + Clone + 'static, +//! Events: ChainEventSubscriptions + Clone + 'static, //! { //! // configure the rpc module per transport //! let transports = TransportRpcModuleConfig::default().with_http(vec![ @@ -42,7 +44,7 @@ //! RethRpcModule::Eth, //! RethRpcModule::Web3, //! ]); -//! let transport_modules = RpcModuleBuilder::new(client, pool, network, TokioTaskExecutor::default()).build(transports); +//! let transport_modules = RpcModuleBuilder::new(client, pool, network, TokioTaskExecutor::default(), events).build(transports); //! let handle = RpcServerConfig::default() //! .with_http(ServerBuilder::default()) //! .start(transport_modules) @@ -60,7 +62,8 @@ use reth_ipc::server::IpcServer; use reth_network_api::{NetworkInfo, Peers}; use reth_provider::{BlockProvider, EvmEnvProvider, StateProviderFactory}; use reth_rpc::{ - AdminApi, DebugApi, EthApi, EthFilter, EthSubscriptionIdProvider, NetApi, TraceApi, Web3Api, + AdminApi, DebugApi, EthApi, EthFilter, EthPubSub, EthSubscriptionIdProvider, NetApi, TraceApi, + Web3Api, }; use reth_rpc_api::servers::*; use reth_transaction_pool::TransactionPool; @@ -90,6 +93,7 @@ mod eth; pub mod constants; pub use crate::eth::{EthConfig, EthHandlers}; use constants::*; +use reth_interfaces::events::ChainEventSubscriptions; use reth_rpc::eth::cache::EthStateCache; use reth_tasks::TaskSpawner; @@ -97,23 +101,25 @@ use reth_tasks::TaskSpawner; mod cors; /// Convenience function for starting a server in one step. -pub async fn launch( +pub async fn launch( client: Client, pool: Pool, network: Network, module_config: impl Into, server_config: impl Into, executor: Tasks, + events: Events, ) -> Result where Client: BlockProvider + StateProviderFactory + EvmEnvProvider + Clone + Unpin + 'static, Pool: TransactionPool + Clone + 'static, Network: NetworkInfo + Peers + Clone + 'static, Tasks: TaskSpawner + Clone + 'static, + Events: ChainEventSubscriptions + Clone + 'static, { let module_config = module_config.into(); let server_config = server_config.into(); - RpcModuleBuilder::new(client, pool, network, executor) + RpcModuleBuilder::new(client, pool, network, executor, events) .build(module_config) .start_server(server_config) .await @@ -123,7 +129,7 @@ where /// /// This is the main entrypoint for up RPC servers. #[derive(Debug)] -pub struct RpcModuleBuilder { +pub struct RpcModuleBuilder { /// The Client type to when creating all rpc handlers client: Client, /// The Pool type to when creating all rpc handlers @@ -132,59 +138,77 @@ pub struct RpcModuleBuilder { network: Network, /// How additional tasks are spawned, for example in the eth pubsub namespace executor: Tasks, + /// Provides access to chain events, such as new blocks, required by pubsub. + events: Events, } // === impl RpcBuilder === -impl RpcModuleBuilder { +impl RpcModuleBuilder { /// Create a new instance of the builder - pub fn new(client: Client, pool: Pool, network: Network, executor: Tasks) -> Self { - Self { client, pool, network, executor } + pub fn new( + client: Client, + pool: Pool, + network: Network, + executor: Tasks, + events: Events, + ) -> Self { + Self { client, pool, network, executor, events } } /// Configure the client instance. - pub fn with_client(self, client: C) -> RpcModuleBuilder + pub fn with_client(self, client: C) -> RpcModuleBuilder where C: BlockProvider + StateProviderFactory + EvmEnvProvider + 'static, { - let Self { pool, network, executor, .. } = self; - RpcModuleBuilder { client, network, pool, executor } + let Self { pool, network, executor, events, .. } = self; + RpcModuleBuilder { client, network, pool, executor, events } } /// Configure the transaction pool instance. - pub fn with_pool

(self, pool: P) -> RpcModuleBuilder + pub fn with_pool

(self, pool: P) -> RpcModuleBuilder where P: TransactionPool + 'static, { - let Self { client, network, executor, .. } = self; - RpcModuleBuilder { client, network, pool, executor } + let Self { client, network, executor, events, .. } = self; + RpcModuleBuilder { client, network, pool, executor, events } } /// Configure the network instance. - pub fn with_network(self, network: N) -> RpcModuleBuilder + pub fn with_network(self, network: N) -> RpcModuleBuilder where N: NetworkInfo + Peers + 'static, { - let Self { client, pool, executor, .. } = self; - RpcModuleBuilder { client, network, pool, executor } + let Self { client, pool, executor, events, .. } = self; + RpcModuleBuilder { client, network, pool, executor, events } } /// Configure the task executor to use for additional tasks. - pub fn with_executor(self, executor: T) -> RpcModuleBuilder + pub fn with_executor(self, executor: T) -> RpcModuleBuilder where T: TaskSpawner + 'static, { - let Self { pool, network, client, .. } = self; - RpcModuleBuilder { client, network, pool, executor } + let Self { pool, network, client, events, .. } = self; + RpcModuleBuilder { client, network, pool, executor, events } + } + + /// Configure the event subscriber instance + pub fn with_events(self, events: E) -> RpcModuleBuilder + where + E: ChainEventSubscriptions + 'static, + { + let Self { client, pool, executor, network, .. } = self; + RpcModuleBuilder { client, network, pool, executor, events } } } -impl RpcModuleBuilder +impl RpcModuleBuilder where Client: BlockProvider + StateProviderFactory + EvmEnvProvider + Clone + Unpin + 'static, Pool: TransactionPool + Clone + 'static, Network: NetworkInfo + Peers + Clone + 'static, Tasks: TaskSpawner + Clone + 'static, + Events: ChainEventSubscriptions + Clone + 'static, { /// Configures all [RpcModule]s specific to the given [TransportRpcModuleConfig] which can be /// used to start the transport server(s). @@ -193,7 +217,7 @@ where pub fn build(self, module_config: TransportRpcModuleConfig) -> TransportRpcModules<()> { let mut modules = TransportRpcModules::default(); - let Self { client, pool, network, executor } = self; + let Self { client, pool, network, executor, events } = self; if !module_config.is_empty() { let TransportRpcModuleConfig { http, ws, ipc, config } = module_config; @@ -203,6 +227,7 @@ where pool, network, executor, + events, config.unwrap_or_default(), ); @@ -215,9 +240,9 @@ where } } -impl Default for RpcModuleBuilder<(), (), (), ()> { +impl Default for RpcModuleBuilder<(), (), (), (), ()> { fn default() -> Self { - RpcModuleBuilder::new((), (), (), ()) + RpcModuleBuilder::new((), (), (), (), ()) } } @@ -321,12 +346,13 @@ impl RpcModuleSelection { /// Note: This will always create new instance of the module handlers and is therefor only /// recommended for launching standalone transports. If multiple transports need to be /// configured it's recommended to use the [RpcModuleBuilder]. - pub fn standalone_module( + pub fn standalone_module( &self, client: Client, pool: Pool, network: Network, executor: Tasks, + events: Events, config: RpcModuleConfig, ) -> RpcModule<()> where @@ -334,8 +360,9 @@ impl RpcModuleSelection { Pool: TransactionPool + Clone + 'static, Network: NetworkInfo + Peers + Clone + 'static, Tasks: TaskSpawner + Clone + 'static, + Events: ChainEventSubscriptions + Clone + 'static, { - let mut registry = RethModuleRegistry::new(client, pool, network, executor, config); + let mut registry = RethModuleRegistry::new(client, pool, network, executor, events, config); registry.module_for(self) } @@ -415,31 +442,44 @@ impl Serialize for RethRpcModule { } /// A Helper type the holds instances of the configured modules. -pub struct RethModuleRegistry { +pub struct RethModuleRegistry { client: Client, pool: Pool, network: Network, executor: Tasks, + events: Events, /// Additional settings for handlers. config: RpcModuleConfig, /// Holds a clone of all the eth namespace handlers - eth: Option>, + eth: Option>, /// Contains the [Methods] of a module modules: HashMap, } // === impl RethModuleRegistry === -impl RethModuleRegistry { +impl + RethModuleRegistry +{ /// Creates a new, empty instance. pub fn new( client: Client, pool: Pool, network: Network, executor: Tasks, + events: Events, config: RpcModuleConfig, ) -> Self { - Self { client, pool, network, eth: None, executor, modules: Default::default(), config } + Self { + client, + pool, + network, + eth: None, + executor, + modules: Default::default(), + config, + events, + } } /// Returns all installed methods @@ -457,7 +497,7 @@ impl RethModuleRegistry RethModuleRegistry +impl RethModuleRegistry where Network: NetworkInfo + Peers + Clone + 'static, { @@ -476,12 +516,13 @@ where } } -impl RethModuleRegistry +impl RethModuleRegistry where Client: BlockProvider + StateProviderFactory + EvmEnvProvider + Clone + Unpin + 'static, Pool: TransactionPool + Clone + 'static, Network: NetworkInfo + Peers + Clone + 'static, Tasks: TaskSpawner + Clone + 'static, + Events: ChainEventSubscriptions + Clone + 'static, { /// Register Eth Namespace pub fn register_eth(&mut self) -> &mut Self { @@ -536,8 +577,8 @@ where &mut self, namespaces: impl Iterator, ) -> Vec { - let eth_api = self.eth_api(); - let eth_cache = self.eth_cache(); + let EthHandlers { api: eth_api, cache: eth_cache, filter: eth_filter, pubsub: eth_pubsub } = + self.with_eth(|eth| eth.clone()); namespaces .map(|namespace| { self.modules @@ -549,7 +590,14 @@ where RethRpcModule::Debug => { DebugApi::new(self.client.clone(), eth_api.clone()).into_rpc().into() } - RethRpcModule::Eth => eth_api.clone().into_rpc().into(), + RethRpcModule::Eth => { + // merge all eth handlers + let mut module = eth_api.clone().into_rpc(); + module.merge(eth_filter.clone().into_rpc()).expect("No conflicts"); + module.merge(eth_pubsub.clone().into_rpc()).expect("No conflicts"); + + module.into() + } RethRpcModule::Net => { NetApi::new(self.network.clone(), eth_api.clone()).into_rpc().into() } @@ -570,16 +618,16 @@ where /// This will spawn exactly one [EthStateCache] service if this is the first time the cache is /// requested. pub fn eth_cache(&mut self) -> EthStateCache { - self.with_eth(|handlers| handlers.eth_cache.clone()) + self.with_eth(|handlers| handlers.cache.clone()) } /// Creates the [EthHandlers] type the first time this is called. fn with_eth(&mut self, f: F) -> R where - F: FnOnce(&EthHandlers) -> R, + F: FnOnce(&EthHandlers) -> R, { if self.eth.is_none() { - let eth_cache = EthStateCache::spawn_with( + let cache = EthStateCache::spawn_with( self.client.clone(), self.config.eth.cache.clone(), self.executor.clone(), @@ -588,13 +636,18 @@ where self.client.clone(), self.pool.clone(), self.network.clone(), - eth_cache.clone(), + cache.clone(), ); let filter = EthFilter::new(self.client.clone(), self.pool.clone()); - // TODO: install pubsub + let pubsub = EthPubSub::new( + self.client.clone(), + self.pool.clone(), + self.events.clone(), + self.network.clone(), + ); - let eth = EthHandlers { api, eth_cache, filter, pubsub: None }; + let eth = EthHandlers { api, cache, filter, pubsub }; self.eth = Some(eth); } f(self.eth.as_ref().expect("exists; qed")) diff --git a/crates/rpc/rpc-builder/tests/it/utils.rs b/crates/rpc/rpc-builder/tests/it/utils.rs index 5e8e8527b0..2c1d6cd842 100644 --- a/crates/rpc/rpc-builder/tests/it/utils.rs +++ b/crates/rpc/rpc-builder/tests/it/utils.rs @@ -1,3 +1,4 @@ +use reth_interfaces::test_utils::TestChainEventSubscriptions; use reth_network_api::test_utils::NoopNetwork; use reth_provider::test_utils::NoopProvider; use reth_rpc_builder::{ @@ -51,11 +52,17 @@ pub async fn launch_http_ws(modules: impl Into) -> RpcServer } /// Returns an [RpcModuleBuilder] with testing components. -pub fn test_rpc_builder() -> RpcModuleBuilder -{ +pub fn test_rpc_builder() -> RpcModuleBuilder< + NoopProvider, + TestPool, + NoopNetwork, + TokioTaskExecutor, + TestChainEventSubscriptions, +> { RpcModuleBuilder::default() .with_client(NoopProvider::default()) .with_pool(testing_pool()) .with_network(NoopNetwork::default()) .with_executor(TokioTaskExecutor::default()) + .with_events(TestChainEventSubscriptions::default()) } diff --git a/crates/rpc/rpc/src/eth/pubsub.rs b/crates/rpc/rpc/src/eth/pubsub.rs index 990f2d7154..20dd300438 100644 --- a/crates/rpc/rpc/src/eth/pubsub.rs +++ b/crates/rpc/rpc/src/eth/pubsub.rs @@ -3,7 +3,8 @@ use crate::eth::logs_utils; use futures::StreamExt; use jsonrpsee::{types::SubscriptionResult, SubscriptionSink}; -use reth_interfaces::{events::ChainEventSubscriptions, sync::SyncStateProvider}; +use reth_interfaces::events::ChainEventSubscriptions; +use reth_network_api::NetworkInfo; use reth_primitives::{filter::FilteredParams, BlockId, TxHash}; use reth_provider::{BlockProvider, EvmEnvProvider}; use reth_rpc_api::EthPubSubApiServer; @@ -60,7 +61,7 @@ where Client: BlockProvider + EvmEnvProvider + Clone + 'static, Pool: TransactionPool + 'static, Events: ChainEventSubscriptions + Clone + 'static, - Network: SyncStateProvider + Clone + 'static, + Network: NetworkInfo + Clone + 'static, { /// Handler for `eth_subscribe` fn subscribe( @@ -91,7 +92,7 @@ async fn handle_accepted( Client: BlockProvider + EvmEnvProvider + Clone + 'static, Pool: TransactionPool + 'static, Events: ChainEventSubscriptions + Clone + 'static, - Network: SyncStateProvider + Clone + 'static, + Network: NetworkInfo + Clone + 'static, { match kind { SubscriptionKind::NewHeads => { @@ -201,7 +202,7 @@ impl EthPubSubInner impl Stream {