feat(rpc): Add RpcModuleConfig and integrate in builder (#1605)

This commit is contained in:
Matthias Seitz
2023-03-02 15:39:00 +01:00
committed by GitHub
parent 6501ce2956
commit 5fbd3f0c9b
5 changed files with 141 additions and 40 deletions

View File

@@ -0,0 +1,25 @@
use reth_rpc::{
eth::cache::{EthStateCache, EthStateCacheConfig},
EthApi, EthFilter, EthPubSub,
};
use serde::{Deserialize, Serialize};
/// All handlers for the `eth` namespace
#[derive(Debug, Clone)]
pub struct EthHandlers<Client, Pool, Network, Events> {
/// Main `eth_` request handler
pub api: EthApi<Client, Pool, Network>,
/// The async caching layer used by the eth handlers
pub eth_cache: EthStateCache,
/// Polling based filter handler available on all transports
pub filter: EthFilter<Client, Pool>,
/// Handler for subscriptions only available for transports that support it (ws, ipc)
pub pubsub: Option<EthPubSub<Client, Pool, Events>>,
}
/// Additional config values for the eth namespace
#[derive(Debug, Clone, Eq, PartialEq, Default, Serialize, Deserialize)]
pub struct EthConfig {
/// Settings for the caching layer
pub cache: EthStateCacheConfig,
}

View File

@@ -64,7 +64,7 @@ use jsonrpsee::{
use reth_ipc::server::IpcServer;
use reth_network_api::{NetworkInfo, Peers};
use reth_provider::{BlockProvider, EvmEnvProvider, HeaderProvider, StateProviderFactory};
use reth_rpc::{AdminApi, DebugApi, EthApi, NetApi, TraceApi, Web3Api};
use reth_rpc::{AdminApi, DebugApi, EthApi, EthFilter, NetApi, TraceApi, Web3Api};
use reth_rpc_api::servers::*;
use reth_transaction_pool::TransactionPool;
use serde::{Deserialize, Serialize, Serializer};
@@ -84,8 +84,12 @@ pub use reth_ipc::server::{Builder as IpcServerBuilder, Endpoint};
/// Auth server utilities.
pub mod auth;
/// Eth utils
mod eth;
/// Common RPC constants.
pub mod constants;
pub use crate::eth::{EthConfig, EthHandlers};
use constants::*;
use reth_rpc::eth::cache::EthStateCache;
use reth_tasks::TaskSpawner;
@@ -204,10 +208,17 @@ where
let Self { client, pool, network, executor } = self;
let mut registry = RethModuleRegistry::new(client, pool, network, executor);
if !module_config.is_empty() {
let TransportRpcModuleConfig { http, ws, ipc } = module_config;
let TransportRpcModuleConfig { http, ws, ipc, config } = module_config;
let mut registry = RethModuleRegistry::new(
client,
pool,
network,
executor,
config.unwrap_or_default(),
);
modules.http = registry.maybe_module(http.as_ref());
modules.ws = registry.maybe_module(ws.as_ref());
modules.ipc = registry.maybe_module(ipc.as_ref());
@@ -223,6 +234,44 @@ impl Default for RpcModuleBuilder<(), (), (), ()> {
}
}
/// Bundles settings for modules
#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct RpcModuleConfig {
/// `eth` namespace settings
eth: EthConfig,
}
// === impl RpcModuleConfig ===
impl RpcModuleConfig {
/// Convenience method to create a new [RpcModuleConfigBuilder]
pub fn builder() -> RpcModuleConfigBuilder {
RpcModuleConfigBuilder::default()
}
}
/// Configures [RpcModuleConfig]
#[derive(Default)]
pub struct RpcModuleConfigBuilder {
eth: Option<EthConfig>,
}
// === impl RpcModuleConfigBuilder ===
impl RpcModuleConfigBuilder {
/// Configures a custom eth namespace config
pub fn eth(mut self, eth: EthConfig) -> Self {
self.eth = Some(eth);
self
}
/// Consumes the type and creates the [RpcModuleConfig]
pub fn build(self) -> RpcModuleConfig {
let RpcModuleConfigBuilder { eth } = self;
RpcModuleConfig { eth: eth.unwrap_or_default() }
}
}
/// Describes the modules that should be installed.
///
/// # Example
@@ -244,7 +293,7 @@ pub enum RpcModuleSelection {
Selection(Vec<RethRpcModule>),
}
// === impl RpcModuleConfig ===
// === impl RpcModuleSelection ===
impl RpcModuleSelection {
/// The standard modules to instantiate by default `eth`, `net`, `web3`
@@ -291,6 +340,7 @@ impl RpcModuleSelection {
pool: Pool,
network: Network,
executor: Tasks,
config: RpcModuleConfig,
) -> RpcModule<()>
where
Client: BlockProvider
@@ -304,7 +354,7 @@ impl RpcModuleSelection {
Network: NetworkInfo + Peers + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
{
let mut registry = RethModuleRegistry::new(client, pool, network, executor);
let mut registry = RethModuleRegistry::new(client, pool, network, executor, config);
registry.module_for(self)
}
@@ -389,11 +439,10 @@ pub struct RethModuleRegistry<Client, Pool, Network, Tasks> {
pool: Pool,
network: Network,
executor: Tasks,
/// Holds a clone of the async [EthStateCache] channel.
eth_cache: Option<EthStateCache>,
/// Holds a clone of the actual [EthApi] namespace impl since this can be required by other
/// namespaces
eth_api: Option<EthApi<Client, Pool, Network>>,
/// Additional settings for handlers.
config: RpcModuleConfig,
/// Holds a clone of all the eth namespace handlers
eth: Option<EthHandlers<Client, Pool, Network, ()>>,
/// Contains the [Methods] of a module
modules: HashMap<RethRpcModule, Methods>,
}
@@ -402,16 +451,14 @@ pub struct RethModuleRegistry<Client, Pool, Network, Tasks> {
impl<Client, Pool, Network, Tasks> RethModuleRegistry<Client, Pool, Network, Tasks> {
/// Creates a new, empty instance.
pub fn new(client: Client, pool: Pool, network: Network, executor: Tasks) -> Self {
Self {
client,
pool,
network,
eth_api: None,
executor,
modules: Default::default(),
eth_cache: None,
}
pub fn new(
client: Client,
pool: Pool,
network: Network,
executor: Tasks,
config: RpcModuleConfig,
) -> Self {
Self { client, pool, network, eth: None, executor, modules: Default::default(), config }
}
/// Returns all installed methods
@@ -538,25 +585,39 @@ where
/// This will spawn exactly one [EthStateCache] service if this is the first time the cache is
/// requested.
pub fn eth_cache(&mut self) -> EthStateCache {
self.eth_cache
.get_or_insert_with(|| {
EthStateCache::spawn_with(
self.client.clone(),
Default::default(),
self.executor.clone(),
)
})
.clone()
self.with_eth(|handlers| handlers.eth_cache.clone())
}
/// Creates the [EthHandlers] type the first time this is called.
fn with_eth<F, R>(&mut self, f: F) -> R
where
F: FnOnce(&EthHandlers<Client, Pool, Network, ()>) -> R,
{
if self.eth.is_none() {
let eth_cache = EthStateCache::spawn_with(
self.client.clone(),
self.config.eth.cache.clone(),
self.executor.clone(),
);
let api = EthApi::new(
self.client.clone(),
self.pool.clone(),
self.network.clone(),
eth_cache.clone(),
);
let filter = EthFilter::new(self.client.clone(), self.pool.clone());
// TODO: install pubsub
let eth = EthHandlers { api, eth_cache, filter, pubsub: None };
self.eth = Some(eth);
}
f(self.eth.as_ref().expect("exists; qed"))
}
/// Returns the configured [EthApi] or creates it if it does not exist yet
fn eth_api(&mut self) -> EthApi<Client, Pool, Network> {
let cache = self.eth_cache();
self.eth_api
.get_or_insert_with(|| {
EthApi::new(self.client.clone(), self.pool.clone(), self.network.clone(), cache)
})
.clone()
self.with_eth(|handlers| handlers.api.clone())
}
}
@@ -731,7 +792,7 @@ impl RpcServerConfig {
///
/// # Example
///
/// Configure an http transport only
/// Configure a http transport only
///
/// ```
/// use reth_rpc_builder::{RethRpcModule, TransportRpcModuleConfig};
@@ -746,6 +807,8 @@ pub struct TransportRpcModuleConfig {
ws: Option<RpcModuleSelection>,
/// ipc module configuration
ipc: Option<RpcModuleSelection>,
/// Config for the modules
config: Option<RpcModuleConfig>,
}
// === impl TransportRpcModuleConfig ===
@@ -784,6 +847,12 @@ impl TransportRpcModuleConfig {
self
}
/// Sets a custom [RpcModuleConfig] for the configured modules.
pub fn with_config(mut self, config: RpcModuleConfig) -> Self {
self.config = Some(config);
self
}
/// Returns true if no transports are configured
pub fn is_empty(&self) -> bool {
self.http.is_none() && self.ws.is_none() && self.ipc.is_none()
@@ -1058,6 +1127,7 @@ mod tests {
])),
ws: None,
ipc: None,
config: None,
}
)
}

View File

@@ -58,7 +58,6 @@ pub trait EthApiSpec: Send + Sync {
/// the main impls. This way [`EthApi`] is not limited to [`jsonrpsee`] and can be used standalone
/// or in other network handlers (for example ipc).
#[derive(Clone)]
#[allow(missing_debug_implementations)]
pub struct EthApi<Client, Pool, Network> {
/// All nested fields bundled together.
inner: Arc<EthApiInner<Client, Pool, Network>>,
@@ -172,6 +171,12 @@ where
}
}
impl<Client, Pool, Events> std::fmt::Debug for EthApi<Client, Pool, Events> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EthApi").finish_non_exhaustive()
}
}
#[async_trait]
impl<Client, Pool, Network> EthApiSpec for EthApi<Client, Pool, Network>
where

View File

@@ -32,7 +32,7 @@ type BlockLruCache<L> = MultiConsumerLruCache<H256, Block, L, BlockResponseSende
type EnvLruCache<L> = MultiConsumerLruCache<H256, (CfgEnv, BlockEnv), L, EnvResponseSender>;
/// Settings for the [EthStateCache]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct EthStateCacheConfig {
/// Max number of bytes for cached block data.

View File

@@ -18,7 +18,8 @@ use tokio_stream::{
/// `Eth` pubsub RPC implementation.
///
/// This handles
/// This handles `eth_subscribe` RPC calls.
#[derive(Clone)]
pub struct EthPubSub<Client, Pool, Events> {
/// All nested fields bundled together.
inner: EthPubSubInner<Client, Pool, Events>,