From cb318192ba4be1b3b2254f4873a6e1b7153ed8f2 Mon Sep 17 00:00:00 2001 From: Aditya Pandey Date: Sat, 25 Mar 2023 15:57:52 +0530 Subject: [PATCH] Putting RPC tracing calls behind semaphore (#1914) Co-authored-by: Matthias Seitz --- crates/rpc/rpc-builder/src/eth.rs | 11 ++++++++++- crates/rpc/rpc-builder/src/lib.rs | 33 +++++++++++++++++++++---------- crates/rpc/rpc/src/call_guard.rs | 21 ++++++++++++++++++++ crates/rpc/rpc/src/debug.rs | 9 ++++++--- crates/rpc/rpc/src/lib.rs | 2 ++ crates/rpc/rpc/src/trace.rs | 28 ++++++++++++++++++++++---- 6 files changed, 86 insertions(+), 18 deletions(-) create mode 100644 crates/rpc/rpc/src/call_guard.rs diff --git a/crates/rpc/rpc-builder/src/eth.rs b/crates/rpc/rpc-builder/src/eth.rs index 97546de877..43b3e7c03d 100644 --- a/crates/rpc/rpc-builder/src/eth.rs +++ b/crates/rpc/rpc-builder/src/eth.rs @@ -18,8 +18,17 @@ pub struct EthHandlers { } /// Additional config values for the eth namespace -#[derive(Debug, Clone, Eq, PartialEq, Default, Serialize, Deserialize)] +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub struct EthConfig { /// Settings for the caching layer pub cache: EthStateCacheConfig, + + /// The maximum number of tracing calls that can be executed in concurrently. + pub max_tracing_requests: usize, +} + +impl Default for EthConfig { + fn default() -> Self { + Self { cache: EthStateCacheConfig::default(), max_tracing_requests: 10 } + } } diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index 0cdc8e09dd..1810b26767 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -65,7 +65,7 @@ use reth_network_api::{NetworkInfo, Peers}; use reth_provider::{BlockProvider, EvmEnvProvider, StateProviderFactory}; use reth_rpc::{ eth::cache::EthStateCache, AdminApi, DebugApi, EthApi, EthFilter, EthPubSub, - EthSubscriptionIdProvider, NetApi, TraceApi, Web3Api, + EthSubscriptionIdProvider, NetApi, TraceApi, TracingCallGuard, Web3Api, }; use reth_rpc_api::servers::*; use reth_tasks::TaskSpawner; @@ -78,6 +78,7 @@ use std::{ str::FromStr, }; use strum::{AsRefStr, EnumString, EnumVariantNames, ParseError, VariantNames}; + use tower::layer::util::{Identity, Stack}; use tower_http::cors::CorsLayer; use tracing::{instrument, trace}; @@ -451,6 +452,8 @@ pub struct RethModuleRegistry { config: RpcModuleConfig, /// Holds a clone of all the eth namespace handlers eth: Option>, + /// to put trace calls behind semaphore + tracing_call_guard: TracingCallGuard, /// Contains the [Methods] of a module modules: HashMap, } @@ -476,6 +479,7 @@ impl eth: None, executor, modules: Default::default(), + tracing_call_guard: TracingCallGuard::new(config.eth.max_tracing_requests), config, events, } @@ -535,7 +539,9 @@ where let eth_api = self.eth_api(); self.modules.insert( RethRpcModule::Debug, - DebugApi::new(self.client.clone(), eth_api).into_rpc().into(), + DebugApi::new(self.client.clone(), eth_api, self.tracing_call_guard.clone()) + .into_rpc() + .into(), ); self } @@ -586,9 +592,13 @@ where RethRpcModule::Admin => { AdminApi::new(self.network.clone()).into_rpc().into() } - RethRpcModule::Debug => { - DebugApi::new(self.client.clone(), eth_api.clone()).into_rpc().into() - } + RethRpcModule::Debug => DebugApi::new( + self.client.clone(), + eth_api.clone(), + self.tracing_call_guard.clone(), + ) + .into_rpc() + .into(), RethRpcModule::Eth => { // merge all eth handlers let mut module = eth_api.clone().into_rpc(); @@ -600,11 +610,14 @@ where RethRpcModule::Net => { NetApi::new(self.network.clone(), eth_api.clone()).into_rpc().into() } - RethRpcModule::Trace => { - TraceApi::new(self.client.clone(), eth_api.clone(), eth_cache.clone()) - .into_rpc() - .into() - } + RethRpcModule::Trace => TraceApi::new( + self.client.clone(), + eth_api.clone(), + eth_cache.clone(), + self.tracing_call_guard.clone(), + ) + .into_rpc() + .into(), RethRpcModule::Web3 => Web3Api::new(self.network.clone()).into_rpc().into(), }) .clone() diff --git a/crates/rpc/rpc/src/call_guard.rs b/crates/rpc/rpc/src/call_guard.rs new file mode 100644 index 0000000000..0e09a6e762 --- /dev/null +++ b/crates/rpc/rpc/src/call_guard.rs @@ -0,0 +1,21 @@ +use std::sync::Arc; +use tokio::sync::{AcquireError, OwnedSemaphorePermit, Semaphore}; + +/// RPC Tracing call guard semaphore. +/// +/// This is used to restrict the number of concurrent RPC requests to tracing methods like +/// `debug_traceTransaction` because they can consume a lot of memory. +#[derive(Clone, Debug)] +pub struct TracingCallGuard(Arc); + +impl TracingCallGuard { + /// Create a new `TracingCallGuard` with the given maximum number of tracing calls in parallel. + pub fn new(max_tracing_requests: usize) -> Self { + Self(Arc::new(Semaphore::new(max_tracing_requests))) + } + + /// See also [Semaphore::acquire_owned] + pub async fn acquire_owned(self) -> Result { + self.0.acquire_owned().await + } +} diff --git a/crates/rpc/rpc/src/debug.rs b/crates/rpc/rpc/src/debug.rs index e08b2d5e00..04a314a919 100644 --- a/crates/rpc/rpc/src/debug.rs +++ b/crates/rpc/rpc/src/debug.rs @@ -5,7 +5,7 @@ use crate::{ EthTransactions, TransactionSource, }, result::{internal_rpc_err, ToRpcResult}, - EthApiSpec, + EthApiSpec, TracingCallGuard, }; use async_trait::async_trait; use jsonrpsee::core::RpcResult; @@ -35,14 +35,17 @@ pub struct DebugApi { client: Client, /// The implementation of `eth` API eth_api: Eth, + + // restrict the number of concurrent calls to `debug_traceTransaction` + tracing_call_guard: TracingCallGuard, } // === impl DebugApi === impl DebugApi { /// Create a new instance of the [DebugApi] - pub fn new(client: Client, eth: Eth) -> Self { - Self { client, eth_api: eth } + pub fn new(client: Client, eth: Eth, tracing_call_guard: TracingCallGuard) -> Self { + Self { client, eth_api: eth, tracing_call_guard } } } diff --git a/crates/rpc/rpc/src/lib.rs b/crates/rpc/rpc/src/lib.rs index 53a37d4dfd..8be6e3af43 100644 --- a/crates/rpc/rpc/src/lib.rs +++ b/crates/rpc/rpc/src/lib.rs @@ -12,6 +12,7 @@ //! Provides the implementation of all RPC interfaces. mod admin; +mod call_guard; mod debug; mod engine; pub mod eth; @@ -21,6 +22,7 @@ mod trace; mod web3; pub use admin::AdminApi; +pub use call_guard::TracingCallGuard; pub use debug::DebugApi; pub use engine::EngineApi; pub use eth::{EthApi, EthApiSpec, EthFilter, EthPubSub, EthSubscriptionIdProvider}; diff --git a/crates/rpc/rpc/src/trace.rs b/crates/rpc/rpc/src/trace.rs index 3b38c2dd71..4a7d9dbc62 100644 --- a/crates/rpc/rpc/src/trace.rs +++ b/crates/rpc/rpc/src/trace.rs @@ -4,6 +4,7 @@ use crate::{ utils::recover_raw_transaction, EthTransactions, }, result::internal_rpc_err, + TracingCallGuard, }; use async_trait::async_trait; use jsonrpsee::core::RpcResult as Result; @@ -14,7 +15,6 @@ use reth_revm::{ env::tx_env_with_recovered, tracing::{TracingInspector, TracingInspectorConfig}, }; - use reth_rpc_api::TraceApiServer; use reth_rpc_types::{ trace::{filter::TraceFilter, parity::*}, @@ -22,6 +22,7 @@ use reth_rpc_types::{ }; use revm::primitives::{Env, ExecutionResult, ResultAndState}; use std::collections::HashSet; +use tokio::sync::{AcquireError, OwnedSemaphorePermit}; /// `trace` API implementation. /// @@ -34,14 +35,29 @@ pub struct TraceApi { eth_api: Eth, /// The async cache frontend for eth related data eth_cache: EthStateCache, + + // restrict the number of concurrent calls to `trace_*` + tracing_call_guard: TracingCallGuard, } // === impl TraceApi === impl TraceApi { /// Create a new instance of the [TraceApi] - pub fn new(client: Client, eth_api: Eth, eth_cache: EthStateCache) -> Self { - Self { client, eth_api, eth_cache } + pub fn new( + client: Client, + eth_api: Eth, + eth_cache: EthStateCache, + tracing_call_guard: TracingCallGuard, + ) -> Self { + Self { client, eth_api, eth_cache, tracing_call_guard } + } + + /// Acquires a permit to execute a tracing call. + async fn acquire_trace_permit( + &self, + ) -> std::result::Result { + self.tracing_call_guard.clone().acquire_owned().await } } @@ -90,6 +106,7 @@ where trace_types: HashSet, block_id: Option, ) -> EthResult { + let _permit = self.acquire_trace_permit().await; let tx = recover_raw_transaction(tx)?; let (cfg, block, at) = self @@ -123,6 +140,8 @@ where hash: H256, trace_address: Vec, ) -> EthResult> { + let _permit = self.acquire_trace_permit().await; + match self.trace_transaction(hash).await? { None => Ok(None), Some(traces) => { @@ -138,6 +157,8 @@ where &self, hash: H256, ) -> EthResult>> { + let _permit = self.acquire_trace_permit().await; + let (transaction, at) = match self.eth_api.transaction_by_hash_at(hash).await? { None => return Ok(None), Some(res) => res, @@ -152,7 +173,6 @@ where // execute the trace self.trace_at(env, TracingInspectorConfig::default_parity(), at, |inspector, _| { let traces = inspector.into_parity_builder().into_localized_transaction_traces(tx_info); - Ok(Some(traces)) }) }