diff --git a/crates/rpc/rpc-builder/src/eth.rs b/crates/rpc/rpc-builder/src/eth.rs index 5d0d064247..224301966b 100644 --- a/crates/rpc/rpc-builder/src/eth.rs +++ b/crates/rpc/rpc-builder/src/eth.rs @@ -1,16 +1,27 @@ +use crate::RpcModuleConfig; +use reth_evm::ConfigureEvm; +use reth_network_api::{NetworkInfo, Peers}; +use reth_provider::{ + AccountReader, BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, + EvmEnvProvider, StateProviderFactory, +}; use reth_rpc::{ eth::{ - cache::{EthStateCache, EthStateCacheConfig}, - gas_oracle::GasPriceOracleConfig, - EthFilterConfig, FeeHistoryCacheConfig, RPC_DEFAULT_GAS_CAP, + cache::{cache_new_blocks_task, EthStateCache, EthStateCacheConfig}, + fee_history_cache_new_blocks_task, + gas_oracle::{GasPriceOracle, GasPriceOracleConfig}, + traits::RawTransactionForwarder, + EthFilterConfig, FeeHistoryCache, FeeHistoryCacheConfig, RPC_DEFAULT_GAS_CAP, }, EthApi, EthFilter, EthPubSub, }; use reth_rpc_server_types::constants::{ default_max_tracing_requests, DEFAULT_MAX_BLOCKS_PER_FILTER, DEFAULT_MAX_LOGS_PER_RESPONSE, }; -use reth_tasks::pool::BlockingTaskPool; +use reth_tasks::{pool::BlockingTaskPool, TaskSpawner}; +use reth_transaction_pool::TransactionPool; use serde::{Deserialize, Serialize}; +use std::sync::Arc; /// All handlers for the `eth` namespace #[derive(Debug, Clone)] @@ -27,6 +38,199 @@ pub struct EthHandlers { pub blocking_task_pool: BlockingTaskPool, } +/// Configuration for `EthHandlersBuilder` +#[derive(Clone, Debug)] +pub(crate) struct EthHandlersConfig { + /// The provider for blockchain data, responsible for reading blocks, accounts, state, etc. + pub(crate) provider: Provider, + /// The transaction pool for managing pending transactions. + pub(crate) pool: Pool, + /// The network information, handling peer connections and network state. + pub(crate) network: Network, + /// The task executor for spawning asynchronous tasks. + pub(crate) executor: Tasks, + /// The event subscriptions for canonical state changes. + pub(crate) events: Events, + /// The EVM configuration for Ethereum Virtual Machine settings. + pub(crate) evm_config: EvmConfig, + /// An optional forwarder for raw transactions. + pub(crate) eth_raw_transaction_forwarder: Option>, +} + +/// Represents the builder for the `EthHandlers` struct, used to configure and create instances of +/// `EthHandlers`. +#[derive(Debug, Clone)] +pub(crate) struct EthHandlersBuilder { + eth_handlers_config: EthHandlersConfig, + /// Configuration for the RPC module + rpc_config: RpcModuleConfig, +} + +impl + EthHandlersBuilder +where + Provider: BlockReaderIdExt + + AccountReader + + StateProviderFactory + + EvmEnvProvider + + ChainSpecProvider + + ChangeSetReader + + Clone + + Unpin + + 'static, + Pool: TransactionPool + Clone + 'static, + Network: NetworkInfo + Peers + Clone + 'static, + Tasks: TaskSpawner + Clone + 'static, + Events: CanonStateSubscriptions + Clone + 'static, + EvmConfig: ConfigureEvm + 'static, +{ + /// Creates a new `EthHandlersBuilder` with the provided components. + pub(crate) const fn new( + eth_handlers_config: EthHandlersConfig, + rpc_config: RpcModuleConfig, + ) -> Self { + Self { eth_handlers_config, rpc_config } + } + + /// Builds and returns an `EthHandlers` instance. + pub(crate) fn build(self) -> EthHandlers { + // Initialize the cache + let cache = self.init_cache(); + + // Initialize the fee history cache + let fee_history_cache = self.init_fee_history_cache(&cache); + + // Spawn background tasks for cache + self.spawn_cache_tasks(&cache, &fee_history_cache); + + // Initialize the gas oracle + let gas_oracle = self.init_gas_oracle(&cache); + + // Initialize the blocking task pool + let blocking_task_pool = self.init_blocking_task_pool(); + + // Initialize the Eth API + let api = self.init_api(&cache, gas_oracle, &fee_history_cache, &blocking_task_pool); + + // Initialize the filter + let filter = self.init_filter(&cache); + + // Initialize the pubsub + let pubsub = self.init_pubsub(); + + EthHandlers { api, cache, filter, pubsub, blocking_task_pool } + } + + /// Initializes the `EthStateCache`. + fn init_cache(&self) -> EthStateCache { + EthStateCache::spawn_with( + self.eth_handlers_config.provider.clone(), + self.rpc_config.eth.cache.clone(), + self.eth_handlers_config.executor.clone(), + self.eth_handlers_config.evm_config.clone(), + ) + } + + /// Initializes the `FeeHistoryCache`. + fn init_fee_history_cache(&self, cache: &EthStateCache) -> FeeHistoryCache { + FeeHistoryCache::new(cache.clone(), self.rpc_config.eth.fee_history_cache.clone()) + } + + /// Spawns background tasks for updating caches. + fn spawn_cache_tasks(&self, cache: &EthStateCache, fee_history_cache: &FeeHistoryCache) { + // Get the stream of new canonical blocks + let new_canonical_blocks = self.eth_handlers_config.events.canonical_state_stream(); + + // Clone the cache for the task + let cache_clone = cache.clone(); + + // Spawn a critical task to update the cache with new blocks + self.eth_handlers_config.executor.spawn_critical( + "cache canonical blocks task", + Box::pin(async move { + cache_new_blocks_task(cache_clone, new_canonical_blocks).await; + }), + ); + + // Get another stream of new canonical blocks + let new_canonical_blocks = self.eth_handlers_config.events.canonical_state_stream(); + + // Clone the fee history cache for the task + let fhc_clone = fee_history_cache.clone(); + + // Clone the provider for the task + let provider_clone = self.eth_handlers_config.provider.clone(); + + // Spawn a critical task to update the fee history cache with new blocks + self.eth_handlers_config.executor.spawn_critical( + "cache canonical blocks for fee history task", + Box::pin(async move { + fee_history_cache_new_blocks_task(fhc_clone, new_canonical_blocks, provider_clone) + .await; + }), + ); + } + + /// Initializes the `GasPriceOracle`. + fn init_gas_oracle(&self, cache: &EthStateCache) -> GasPriceOracle { + GasPriceOracle::new( + self.eth_handlers_config.provider.clone(), + self.rpc_config.eth.gas_oracle.clone(), + cache.clone(), + ) + } + + /// Initializes the `BlockingTaskPool`. + fn init_blocking_task_pool(&self) -> BlockingTaskPool { + BlockingTaskPool::build().expect("failed to build tracing pool") + } + + /// Initializes the `EthApi`. + fn init_api( + &self, + cache: &EthStateCache, + gas_oracle: GasPriceOracle, + fee_history_cache: &FeeHistoryCache, + blocking_task_pool: &BlockingTaskPool, + ) -> EthApi { + EthApi::with_spawner( + self.eth_handlers_config.provider.clone(), + self.eth_handlers_config.pool.clone(), + self.eth_handlers_config.network.clone(), + cache.clone(), + gas_oracle, + self.rpc_config.eth.rpc_gas_cap, + Box::new(self.eth_handlers_config.executor.clone()), + blocking_task_pool.clone(), + fee_history_cache.clone(), + self.eth_handlers_config.evm_config.clone(), + self.eth_handlers_config.eth_raw_transaction_forwarder.clone(), + ) + } + + /// Initializes the `EthFilter`. + fn init_filter(&self, cache: &EthStateCache) -> EthFilter { + EthFilter::new( + self.eth_handlers_config.provider.clone(), + self.eth_handlers_config.pool.clone(), + cache.clone(), + self.rpc_config.eth.filter_config(), + Box::new(self.eth_handlers_config.executor.clone()), + ) + } + + /// Initializes the `EthPubSub`. + fn init_pubsub(&self) -> EthPubSub { + EthPubSub::with_spawner( + self.eth_handlers_config.provider.clone(), + self.eth_handlers_config.pool.clone(), + self.eth_handlers_config.events.clone(), + self.eth_handlers_config.network.clone(), + Box::new(self.eth_handlers_config.executor.clone()), + ) + } +} + /// Additional config values for the eth namespace. #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub struct EthConfig { diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index 93d4e6b264..b635a13510 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -156,7 +156,10 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] use crate::{ - auth::AuthRpcModule, cors::CorsDomainError, error::WsHttpSamePortError, + auth::AuthRpcModule, + cors::CorsDomainError, + error::WsHttpSamePortError, + eth::{EthHandlersBuilder, EthHandlersConfig}, metrics::RpcRequestMetrics, }; use error::{ConflictingModules, RpcError, ServerKind}; @@ -175,22 +178,13 @@ use reth_provider::{ ChangeSetReader, EvmEnvProvider, StateProviderFactory, }; use reth_rpc::{ - eth::{ - cache::{cache_new_blocks_task, EthStateCache}, - fee_history_cache_new_blocks_task, - gas_oracle::GasPriceOracle, - traits::RawTransactionForwarder, - EthBundle, FeeHistoryCache, - }, - AdminApi, DebugApi, EngineEthApi, EthApi, EthFilter, EthPubSub, EthSubscriptionIdProvider, - NetApi, OtterscanApi, RPCApi, RethApi, TraceApi, TxPoolApi, Web3Api, + eth::{cache::EthStateCache, traits::RawTransactionForwarder, EthBundle}, + AdminApi, DebugApi, EngineEthApi, EthApi, EthSubscriptionIdProvider, NetApi, OtterscanApi, + RPCApi, RethApi, TraceApi, TxPoolApi, Web3Api, }; use reth_rpc_api::servers::*; use reth_rpc_layer::{AuthLayer, Claims, JwtAuthValidator, JwtSecret}; -use reth_tasks::{ - pool::{BlockingTaskGuard, BlockingTaskPool}, - TaskSpawner, TokioTaskExecutor, -}; +use reth_tasks::{pool::BlockingTaskGuard, TaskSpawner, TokioTaskExecutor}; use reth_transaction_pool::{noop::NoopTransactionPool, TransactionPool}; use serde::{Deserialize, Serialize}; use std::{ @@ -1001,7 +995,7 @@ where /// /// This will spawn the required service tasks for [`EthApi`] for: /// - [`EthStateCache`] - /// - [`FeeHistoryCache`] + /// - [`reth_rpc::eth::FeeHistoryCache`] fn with_eth(&mut self, f: F) -> R where F: FnOnce(&EthHandlers) -> R, @@ -1013,71 +1007,19 @@ where } fn init_eth(&self) -> EthHandlers { - let cache = EthStateCache::spawn_with( - self.provider.clone(), - self.config.eth.cache.clone(), - self.executor.clone(), - self.evm_config.clone(), - ); - let gas_oracle = GasPriceOracle::new( - self.provider.clone(), - self.config.eth.gas_oracle.clone(), - cache.clone(), - ); - let new_canonical_blocks = self.events.canonical_state_stream(); - let c = cache.clone(); - - self.executor.spawn_critical( - "cache canonical blocks task", - Box::pin(async move { - cache_new_blocks_task(c, new_canonical_blocks).await; - }), - ); - - let fee_history_cache = - FeeHistoryCache::new(cache.clone(), self.config.eth.fee_history_cache.clone()); - let new_canonical_blocks = self.events.canonical_state_stream(); - let fhc = fee_history_cache.clone(); - let provider_clone = self.provider.clone(); - self.executor.spawn_critical( - "cache canonical blocks for fee history task", - Box::pin(async move { - fee_history_cache_new_blocks_task(fhc, new_canonical_blocks, provider_clone).await; - }), - ); - - let executor = Box::new(self.executor.clone()); - let blocking_task_pool = BlockingTaskPool::build().expect("failed to build tracing pool"); - let api = EthApi::with_spawner( - self.provider.clone(), - self.pool.clone(), - self.network.clone(), - cache.clone(), - gas_oracle, - self.config.eth.rpc_gas_cap, - executor.clone(), - blocking_task_pool.clone(), - fee_history_cache, - self.evm_config.clone(), - self.eth_raw_transaction_forwarder.clone(), - ); - let filter = EthFilter::new( - self.provider.clone(), - self.pool.clone(), - cache.clone(), - self.config.eth.filter_config(), - executor.clone(), - ); - - let pubsub = EthPubSub::with_spawner( - self.provider.clone(), - self.pool.clone(), - self.events.clone(), - self.network.clone(), - executor, - ); - - EthHandlers { api, cache, filter, pubsub, blocking_task_pool } + EthHandlersBuilder::new( + EthHandlersConfig { + provider: self.provider.clone(), + pool: self.pool.clone(), + network: self.network.clone(), + executor: self.executor.clone(), + events: self.events.clone(), + evm_config: self.evm_config.clone(), + eth_raw_transaction_forwarder: self.eth_raw_transaction_forwarder.clone(), + }, + self.config.clone(), + ) + .build() } /// Returns the configured [`EthHandlers`] or creates it if it does not exist yet