mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-10 15:58:27 -05:00
refactor(rpc): add builder pattern for EthHandlers (#9035)
This commit is contained in:
@@ -1,16 +1,27 @@
|
||||
use crate::RpcModuleConfig;
|
||||
use reth_evm::ConfigureEvm;
|
||||
use reth_network_api::{NetworkInfo, Peers};
|
||||
use reth_provider::{
|
||||
AccountReader, BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader,
|
||||
EvmEnvProvider, StateProviderFactory,
|
||||
};
|
||||
use reth_rpc::{
|
||||
eth::{
|
||||
cache::{EthStateCache, EthStateCacheConfig},
|
||||
gas_oracle::GasPriceOracleConfig,
|
||||
EthFilterConfig, FeeHistoryCacheConfig, RPC_DEFAULT_GAS_CAP,
|
||||
cache::{cache_new_blocks_task, EthStateCache, EthStateCacheConfig},
|
||||
fee_history_cache_new_blocks_task,
|
||||
gas_oracle::{GasPriceOracle, GasPriceOracleConfig},
|
||||
traits::RawTransactionForwarder,
|
||||
EthFilterConfig, FeeHistoryCache, FeeHistoryCacheConfig, RPC_DEFAULT_GAS_CAP,
|
||||
},
|
||||
EthApi, EthFilter, EthPubSub,
|
||||
};
|
||||
use reth_rpc_server_types::constants::{
|
||||
default_max_tracing_requests, DEFAULT_MAX_BLOCKS_PER_FILTER, DEFAULT_MAX_LOGS_PER_RESPONSE,
|
||||
};
|
||||
use reth_tasks::pool::BlockingTaskPool;
|
||||
use reth_tasks::{pool::BlockingTaskPool, TaskSpawner};
|
||||
use reth_transaction_pool::TransactionPool;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::sync::Arc;
|
||||
|
||||
/// All handlers for the `eth` namespace
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -27,6 +38,199 @@ pub struct EthHandlers<Provider, Pool, Network, Events, EvmConfig> {
|
||||
pub blocking_task_pool: BlockingTaskPool,
|
||||
}
|
||||
|
||||
/// Configuration for `EthHandlersBuilder`
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) struct EthHandlersConfig<Provider, Pool, Network, Tasks, Events, EvmConfig> {
|
||||
/// The provider for blockchain data, responsible for reading blocks, accounts, state, etc.
|
||||
pub(crate) provider: Provider,
|
||||
/// The transaction pool for managing pending transactions.
|
||||
pub(crate) pool: Pool,
|
||||
/// The network information, handling peer connections and network state.
|
||||
pub(crate) network: Network,
|
||||
/// The task executor for spawning asynchronous tasks.
|
||||
pub(crate) executor: Tasks,
|
||||
/// The event subscriptions for canonical state changes.
|
||||
pub(crate) events: Events,
|
||||
/// The EVM configuration for Ethereum Virtual Machine settings.
|
||||
pub(crate) evm_config: EvmConfig,
|
||||
/// An optional forwarder for raw transactions.
|
||||
pub(crate) eth_raw_transaction_forwarder: Option<Arc<dyn RawTransactionForwarder>>,
|
||||
}
|
||||
|
||||
/// Represents the builder for the `EthHandlers` struct, used to configure and create instances of
|
||||
/// `EthHandlers`.
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct EthHandlersBuilder<Provider, Pool, Network, Tasks, Events, EvmConfig> {
|
||||
eth_handlers_config: EthHandlersConfig<Provider, Pool, Network, Tasks, Events, EvmConfig>,
|
||||
/// Configuration for the RPC module
|
||||
rpc_config: RpcModuleConfig,
|
||||
}
|
||||
|
||||
impl<Provider, Pool, Network, Tasks, Events, EvmConfig>
|
||||
EthHandlersBuilder<Provider, Pool, Network, Tasks, Events, EvmConfig>
|
||||
where
|
||||
Provider: BlockReaderIdExt
|
||||
+ AccountReader
|
||||
+ StateProviderFactory
|
||||
+ EvmEnvProvider
|
||||
+ ChainSpecProvider
|
||||
+ ChangeSetReader
|
||||
+ Clone
|
||||
+ Unpin
|
||||
+ 'static,
|
||||
Pool: TransactionPool + Clone + 'static,
|
||||
Network: NetworkInfo + Peers + Clone + 'static,
|
||||
Tasks: TaskSpawner + Clone + 'static,
|
||||
Events: CanonStateSubscriptions + Clone + 'static,
|
||||
EvmConfig: ConfigureEvm + 'static,
|
||||
{
|
||||
/// Creates a new `EthHandlersBuilder` with the provided components.
|
||||
pub(crate) const fn new(
|
||||
eth_handlers_config: EthHandlersConfig<Provider, Pool, Network, Tasks, Events, EvmConfig>,
|
||||
rpc_config: RpcModuleConfig,
|
||||
) -> Self {
|
||||
Self { eth_handlers_config, rpc_config }
|
||||
}
|
||||
|
||||
/// Builds and returns an `EthHandlers` instance.
|
||||
pub(crate) fn build(self) -> EthHandlers<Provider, Pool, Network, Events, EvmConfig> {
|
||||
// Initialize the cache
|
||||
let cache = self.init_cache();
|
||||
|
||||
// Initialize the fee history cache
|
||||
let fee_history_cache = self.init_fee_history_cache(&cache);
|
||||
|
||||
// Spawn background tasks for cache
|
||||
self.spawn_cache_tasks(&cache, &fee_history_cache);
|
||||
|
||||
// Initialize the gas oracle
|
||||
let gas_oracle = self.init_gas_oracle(&cache);
|
||||
|
||||
// Initialize the blocking task pool
|
||||
let blocking_task_pool = self.init_blocking_task_pool();
|
||||
|
||||
// Initialize the Eth API
|
||||
let api = self.init_api(&cache, gas_oracle, &fee_history_cache, &blocking_task_pool);
|
||||
|
||||
// Initialize the filter
|
||||
let filter = self.init_filter(&cache);
|
||||
|
||||
// Initialize the pubsub
|
||||
let pubsub = self.init_pubsub();
|
||||
|
||||
EthHandlers { api, cache, filter, pubsub, blocking_task_pool }
|
||||
}
|
||||
|
||||
/// Initializes the `EthStateCache`.
|
||||
fn init_cache(&self) -> EthStateCache {
|
||||
EthStateCache::spawn_with(
|
||||
self.eth_handlers_config.provider.clone(),
|
||||
self.rpc_config.eth.cache.clone(),
|
||||
self.eth_handlers_config.executor.clone(),
|
||||
self.eth_handlers_config.evm_config.clone(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Initializes the `FeeHistoryCache`.
|
||||
fn init_fee_history_cache(&self, cache: &EthStateCache) -> FeeHistoryCache {
|
||||
FeeHistoryCache::new(cache.clone(), self.rpc_config.eth.fee_history_cache.clone())
|
||||
}
|
||||
|
||||
/// Spawns background tasks for updating caches.
|
||||
fn spawn_cache_tasks(&self, cache: &EthStateCache, fee_history_cache: &FeeHistoryCache) {
|
||||
// Get the stream of new canonical blocks
|
||||
let new_canonical_blocks = self.eth_handlers_config.events.canonical_state_stream();
|
||||
|
||||
// Clone the cache for the task
|
||||
let cache_clone = cache.clone();
|
||||
|
||||
// Spawn a critical task to update the cache with new blocks
|
||||
self.eth_handlers_config.executor.spawn_critical(
|
||||
"cache canonical blocks task",
|
||||
Box::pin(async move {
|
||||
cache_new_blocks_task(cache_clone, new_canonical_blocks).await;
|
||||
}),
|
||||
);
|
||||
|
||||
// Get another stream of new canonical blocks
|
||||
let new_canonical_blocks = self.eth_handlers_config.events.canonical_state_stream();
|
||||
|
||||
// Clone the fee history cache for the task
|
||||
let fhc_clone = fee_history_cache.clone();
|
||||
|
||||
// Clone the provider for the task
|
||||
let provider_clone = self.eth_handlers_config.provider.clone();
|
||||
|
||||
// Spawn a critical task to update the fee history cache with new blocks
|
||||
self.eth_handlers_config.executor.spawn_critical(
|
||||
"cache canonical blocks for fee history task",
|
||||
Box::pin(async move {
|
||||
fee_history_cache_new_blocks_task(fhc_clone, new_canonical_blocks, provider_clone)
|
||||
.await;
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
/// Initializes the `GasPriceOracle`.
|
||||
fn init_gas_oracle(&self, cache: &EthStateCache) -> GasPriceOracle<Provider> {
|
||||
GasPriceOracle::new(
|
||||
self.eth_handlers_config.provider.clone(),
|
||||
self.rpc_config.eth.gas_oracle.clone(),
|
||||
cache.clone(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Initializes the `BlockingTaskPool`.
|
||||
fn init_blocking_task_pool(&self) -> BlockingTaskPool {
|
||||
BlockingTaskPool::build().expect("failed to build tracing pool")
|
||||
}
|
||||
|
||||
/// Initializes the `EthApi`.
|
||||
fn init_api(
|
||||
&self,
|
||||
cache: &EthStateCache,
|
||||
gas_oracle: GasPriceOracle<Provider>,
|
||||
fee_history_cache: &FeeHistoryCache,
|
||||
blocking_task_pool: &BlockingTaskPool,
|
||||
) -> EthApi<Provider, Pool, Network, EvmConfig> {
|
||||
EthApi::with_spawner(
|
||||
self.eth_handlers_config.provider.clone(),
|
||||
self.eth_handlers_config.pool.clone(),
|
||||
self.eth_handlers_config.network.clone(),
|
||||
cache.clone(),
|
||||
gas_oracle,
|
||||
self.rpc_config.eth.rpc_gas_cap,
|
||||
Box::new(self.eth_handlers_config.executor.clone()),
|
||||
blocking_task_pool.clone(),
|
||||
fee_history_cache.clone(),
|
||||
self.eth_handlers_config.evm_config.clone(),
|
||||
self.eth_handlers_config.eth_raw_transaction_forwarder.clone(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Initializes the `EthFilter`.
|
||||
fn init_filter(&self, cache: &EthStateCache) -> EthFilter<Provider, Pool> {
|
||||
EthFilter::new(
|
||||
self.eth_handlers_config.provider.clone(),
|
||||
self.eth_handlers_config.pool.clone(),
|
||||
cache.clone(),
|
||||
self.rpc_config.eth.filter_config(),
|
||||
Box::new(self.eth_handlers_config.executor.clone()),
|
||||
)
|
||||
}
|
||||
|
||||
/// Initializes the `EthPubSub`.
|
||||
fn init_pubsub(&self) -> EthPubSub<Provider, Pool, Events, Network> {
|
||||
EthPubSub::with_spawner(
|
||||
self.eth_handlers_config.provider.clone(),
|
||||
self.eth_handlers_config.pool.clone(),
|
||||
self.eth_handlers_config.events.clone(),
|
||||
self.eth_handlers_config.network.clone(),
|
||||
Box::new(self.eth_handlers_config.executor.clone()),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// Additional config values for the eth namespace.
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
|
||||
pub struct EthConfig {
|
||||
|
||||
@@ -156,7 +156,10 @@
|
||||
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
|
||||
|
||||
use crate::{
|
||||
auth::AuthRpcModule, cors::CorsDomainError, error::WsHttpSamePortError,
|
||||
auth::AuthRpcModule,
|
||||
cors::CorsDomainError,
|
||||
error::WsHttpSamePortError,
|
||||
eth::{EthHandlersBuilder, EthHandlersConfig},
|
||||
metrics::RpcRequestMetrics,
|
||||
};
|
||||
use error::{ConflictingModules, RpcError, ServerKind};
|
||||
@@ -175,22 +178,13 @@ use reth_provider::{
|
||||
ChangeSetReader, EvmEnvProvider, StateProviderFactory,
|
||||
};
|
||||
use reth_rpc::{
|
||||
eth::{
|
||||
cache::{cache_new_blocks_task, EthStateCache},
|
||||
fee_history_cache_new_blocks_task,
|
||||
gas_oracle::GasPriceOracle,
|
||||
traits::RawTransactionForwarder,
|
||||
EthBundle, FeeHistoryCache,
|
||||
},
|
||||
AdminApi, DebugApi, EngineEthApi, EthApi, EthFilter, EthPubSub, EthSubscriptionIdProvider,
|
||||
NetApi, OtterscanApi, RPCApi, RethApi, TraceApi, TxPoolApi, Web3Api,
|
||||
eth::{cache::EthStateCache, traits::RawTransactionForwarder, EthBundle},
|
||||
AdminApi, DebugApi, EngineEthApi, EthApi, EthSubscriptionIdProvider, NetApi, OtterscanApi,
|
||||
RPCApi, RethApi, TraceApi, TxPoolApi, Web3Api,
|
||||
};
|
||||
use reth_rpc_api::servers::*;
|
||||
use reth_rpc_layer::{AuthLayer, Claims, JwtAuthValidator, JwtSecret};
|
||||
use reth_tasks::{
|
||||
pool::{BlockingTaskGuard, BlockingTaskPool},
|
||||
TaskSpawner, TokioTaskExecutor,
|
||||
};
|
||||
use reth_tasks::{pool::BlockingTaskGuard, TaskSpawner, TokioTaskExecutor};
|
||||
use reth_transaction_pool::{noop::NoopTransactionPool, TransactionPool};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{
|
||||
@@ -1001,7 +995,7 @@ where
|
||||
///
|
||||
/// This will spawn the required service tasks for [`EthApi`] for:
|
||||
/// - [`EthStateCache`]
|
||||
/// - [`FeeHistoryCache`]
|
||||
/// - [`reth_rpc::eth::FeeHistoryCache`]
|
||||
fn with_eth<F, R>(&mut self, f: F) -> R
|
||||
where
|
||||
F: FnOnce(&EthHandlers<Provider, Pool, Network, Events, EvmConfig>) -> R,
|
||||
@@ -1013,71 +1007,19 @@ where
|
||||
}
|
||||
|
||||
fn init_eth(&self) -> EthHandlers<Provider, Pool, Network, Events, EvmConfig> {
|
||||
let cache = EthStateCache::spawn_with(
|
||||
self.provider.clone(),
|
||||
self.config.eth.cache.clone(),
|
||||
self.executor.clone(),
|
||||
self.evm_config.clone(),
|
||||
);
|
||||
let gas_oracle = GasPriceOracle::new(
|
||||
self.provider.clone(),
|
||||
self.config.eth.gas_oracle.clone(),
|
||||
cache.clone(),
|
||||
);
|
||||
let new_canonical_blocks = self.events.canonical_state_stream();
|
||||
let c = cache.clone();
|
||||
|
||||
self.executor.spawn_critical(
|
||||
"cache canonical blocks task",
|
||||
Box::pin(async move {
|
||||
cache_new_blocks_task(c, new_canonical_blocks).await;
|
||||
}),
|
||||
);
|
||||
|
||||
let fee_history_cache =
|
||||
FeeHistoryCache::new(cache.clone(), self.config.eth.fee_history_cache.clone());
|
||||
let new_canonical_blocks = self.events.canonical_state_stream();
|
||||
let fhc = fee_history_cache.clone();
|
||||
let provider_clone = self.provider.clone();
|
||||
self.executor.spawn_critical(
|
||||
"cache canonical blocks for fee history task",
|
||||
Box::pin(async move {
|
||||
fee_history_cache_new_blocks_task(fhc, new_canonical_blocks, provider_clone).await;
|
||||
}),
|
||||
);
|
||||
|
||||
let executor = Box::new(self.executor.clone());
|
||||
let blocking_task_pool = BlockingTaskPool::build().expect("failed to build tracing pool");
|
||||
let api = EthApi::with_spawner(
|
||||
self.provider.clone(),
|
||||
self.pool.clone(),
|
||||
self.network.clone(),
|
||||
cache.clone(),
|
||||
gas_oracle,
|
||||
self.config.eth.rpc_gas_cap,
|
||||
executor.clone(),
|
||||
blocking_task_pool.clone(),
|
||||
fee_history_cache,
|
||||
self.evm_config.clone(),
|
||||
self.eth_raw_transaction_forwarder.clone(),
|
||||
);
|
||||
let filter = EthFilter::new(
|
||||
self.provider.clone(),
|
||||
self.pool.clone(),
|
||||
cache.clone(),
|
||||
self.config.eth.filter_config(),
|
||||
executor.clone(),
|
||||
);
|
||||
|
||||
let pubsub = EthPubSub::with_spawner(
|
||||
self.provider.clone(),
|
||||
self.pool.clone(),
|
||||
self.events.clone(),
|
||||
self.network.clone(),
|
||||
executor,
|
||||
);
|
||||
|
||||
EthHandlers { api, cache, filter, pubsub, blocking_task_pool }
|
||||
EthHandlersBuilder::new(
|
||||
EthHandlersConfig {
|
||||
provider: self.provider.clone(),
|
||||
pool: self.pool.clone(),
|
||||
network: self.network.clone(),
|
||||
executor: self.executor.clone(),
|
||||
events: self.events.clone(),
|
||||
evm_config: self.evm_config.clone(),
|
||||
eth_raw_transaction_forwarder: self.eth_raw_transaction_forwarder.clone(),
|
||||
},
|
||||
self.config.clone(),
|
||||
)
|
||||
.build()
|
||||
}
|
||||
|
||||
/// Returns the configured [`EthHandlers`] or creates it if it does not exist yet
|
||||
|
||||
Reference in New Issue
Block a user