diff --git a/Cargo.lock b/Cargo.lock index 1d5a1f6b22..119ae278d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5682,6 +5682,7 @@ dependencies = [ "jsonwebtoken", "pin-project", "rand 0.8.5", + "rayon", "reth-consensus-common", "reth-interfaces", "reth-metrics", @@ -5741,8 +5742,6 @@ version = "0.1.0-alpha.4" dependencies = [ "hyper", "jsonrpsee", - "pin-project", - "rayon", "reth-beacon-consensus", "reth-interfaces", "reth-ipc", diff --git a/crates/rpc/rpc-builder/Cargo.toml b/crates/rpc/rpc-builder/Cargo.toml index aeb1d300d2..7f8869e64d 100644 --- a/crates/rpc/rpc-builder/Cargo.toml +++ b/crates/rpc/rpc-builder/Cargo.toml @@ -34,9 +34,6 @@ strum = { workspace = true, features = ["derive"] } serde = { workspace = true, features = ["derive"] } thiserror.workspace = true tracing.workspace = true -rayon.workspace = true -pin-project.workspace = true -tokio = { workspace = true, features = ["sync"] } [dev-dependencies] reth-tracing = { path = "../../tracing" } diff --git a/crates/rpc/rpc-builder/src/auth.rs b/crates/rpc/rpc-builder/src/auth.rs index 758713bbde..d7d14f963d 100644 --- a/crates/rpc/rpc-builder/src/auth.rs +++ b/crates/rpc/rpc-builder/src/auth.rs @@ -17,7 +17,7 @@ use reth_provider::{ use reth_rpc::{ eth::{cache::EthStateCache, gas_oracle::GasPriceOracle}, AuthLayer, Claims, EngineEthApi, EthApi, EthFilter, EthSubscriptionIdProvider, - JwtAuthValidator, JwtSecret, + JwtAuthValidator, JwtSecret, TracingCallPool, }; use reth_rpc_api::{servers::*, EngineApiServer}; use reth_tasks::TaskSpawner; @@ -64,6 +64,7 @@ where gas_oracle, EthConfig::default().rpc_gas_cap, Box::new(executor.clone()), + TracingCallPool::build().expect("failed to build tracing pool"), ); let eth_filter = EthFilter::new( provider, diff --git a/crates/rpc/rpc-builder/src/eth.rs b/crates/rpc/rpc-builder/src/eth.rs index 34ec4989c6..b372d3be77 100644 --- a/crates/rpc/rpc-builder/src/eth.rs +++ b/crates/rpc/rpc-builder/src/eth.rs @@ -4,7 +4,7 @@ use reth_rpc::{ gas_oracle::GasPriceOracleConfig, RPC_DEFAULT_GAS_CAP, }, - EthApi, EthFilter, EthPubSub, + EthApi, EthFilter, EthPubSub, TracingCallPool, }; use serde::{Deserialize, Serialize}; @@ -25,6 +25,8 @@ pub struct EthHandlers { pub filter: EthFilter, /// Handler for subscriptions only available for transports that support it (ws, ipc) pub pubsub: EthPubSub, + /// The configured tracing call pool + pub tracing_call_pool: TracingCallPool, } /// Additional config values for the eth namespace diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index 4e86181b94..44af9cb5e1 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -122,7 +122,8 @@ use reth_rpc::{ gas_oracle::GasPriceOracle, }, AdminApi, DebugApi, EngineEthApi, EthApi, EthFilter, EthPubSub, EthSubscriptionIdProvider, - NetApi, OtterscanApi, RPCApi, RethApi, TraceApi, TracingCallGuard, TxPoolApi, Web3Api, + NetApi, OtterscanApi, RPCApi, RethApi, TraceApi, TracingCallGuard, TracingCallPool, TxPoolApi, + Web3Api, }; use reth_rpc_api::{servers::*, EngineApiServer}; use reth_tasks::{TaskSpawner, TokioTaskExecutor}; @@ -154,9 +155,6 @@ mod eth; /// Common RPC constants. pub mod constants; -/// Additional support for tracing related rpc calls -pub mod tracing_pool; - // Rpc server metrics mod metrics; @@ -816,15 +814,9 @@ where let eth = self.eth_handlers(); self.modules.insert( RethRpcModule::Trace, - TraceApi::new( - self.provider.clone(), - eth.api.clone(), - eth.cache, - Box::new(self.executor.clone()), - self.tracing_call_guard.clone(), - ) - .into_rpc() - .into(), + TraceApi::new(self.provider.clone(), eth.api.clone(), self.tracing_call_guard.clone()) + .into_rpc() + .into(), ); self } @@ -895,8 +887,13 @@ where &mut self, namespaces: impl Iterator, ) -> Vec { - let EthHandlers { api: eth_api, cache: eth_cache, filter: eth_filter, pubsub: eth_pubsub } = - self.with_eth(|eth| eth.clone()); + let EthHandlers { + api: eth_api, + filter: eth_filter, + pubsub: eth_pubsub, + cache: _, + tracing_call_pool: _, + } = self.with_eth(|eth| eth.clone()); // Create a copy, so we can list out all the methods for rpc_ api let namespaces: Vec<_> = namespaces.collect(); @@ -933,8 +930,6 @@ where RethRpcModule::Trace => TraceApi::new( self.provider.clone(), eth_api.clone(), - eth_cache.clone(), - Box::new(self.executor.clone()), self.tracing_call_guard.clone(), ) .into_rpc() @@ -997,6 +992,7 @@ where ); let executor = Box::new(self.executor.clone()); + let tracing_call_pool = TracingCallPool::build().expect("failed to build tracing pool"); let api = EthApi::with_spawner( self.provider.clone(), self.pool.clone(), @@ -1005,6 +1001,7 @@ where gas_oracle, self.config.eth.rpc_gas_cap, executor.clone(), + tracing_call_pool.clone(), ); let filter = EthFilter::new( self.provider.clone(), @@ -1022,7 +1019,7 @@ where executor, ); - let eth = EthHandlers { api, cache, filter, pubsub }; + let eth = EthHandlers { api, cache, filter, pubsub, tracing_call_pool }; self.eth = Some(eth); } f(self.eth.as_ref().expect("exists; qed")) diff --git a/crates/rpc/rpc/Cargo.toml b/crates/rpc/rpc/Cargo.toml index bfb9421f92..dfd86b2710 100644 --- a/crates/rpc/rpc/Cargo.toml +++ b/crates/rpc/rpc/Cargo.toml @@ -48,6 +48,7 @@ tower = "0.4" tokio-stream = { workspace = true, features = ["sync"] } tokio-util = "0.7" pin-project.workspace = true +rayon.workspace = true bytes.workspace = true secp256k1 = { workspace = true, features = ["global-context", "rand-std", "recovery"] } diff --git a/crates/rpc/rpc/src/call_guard.rs b/crates/rpc/rpc/src/call_guard.rs deleted file mode 100644 index bec4ed3ca9..0000000000 --- a/crates/rpc/rpc/src/call_guard.rs +++ /dev/null @@ -1,26 +0,0 @@ -use std::sync::Arc; -use tokio::sync::{AcquireError, OwnedSemaphorePermit, Semaphore}; - -/// RPC Tracing call guard semaphore. -/// -/// This is used to restrict the number of concurrent RPC requests to tracing methods like -/// `debug_traceTransaction` because they can consume a lot of memory and CPU. -#[derive(Clone, Debug)] -pub struct TracingCallGuard(Arc); - -impl TracingCallGuard { - /// Create a new `TracingCallGuard` with the given maximum number of tracing calls in parallel. - pub fn new(max_tracing_requests: u32) -> Self { - Self(Arc::new(Semaphore::new(max_tracing_requests as usize))) - } - - /// See also [Semaphore::acquire_owned] - pub async fn acquire_owned(self) -> Result { - self.0.acquire_owned().await - } - - /// See also [Semaphore::acquire_many_owned] - pub async fn acquire_many_owned(self, n: u32) -> Result { - self.0.acquire_many_owned(n).await - } -} diff --git a/crates/rpc/rpc/src/debug.rs b/crates/rpc/rpc/src/debug.rs index d6abad21fe..3c2da93f61 100644 --- a/crates/rpc/rpc/src/debug.rs +++ b/crates/rpc/rpc/src/debug.rs @@ -40,8 +40,8 @@ use revm_primitives::{ db::{DatabaseCommit, DatabaseRef}, BlockEnv, CfgEnv, }; -use std::{future::Future, sync::Arc}; -use tokio::sync::{mpsc, oneshot, AcquireError, OwnedSemaphorePermit}; +use std::sync::Arc; +use tokio::sync::{mpsc, AcquireError, OwnedSemaphorePermit}; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; /// `debug` API implementation. @@ -74,63 +74,11 @@ where Provider: BlockReaderIdExt + HeaderProvider + 'static, Eth: EthTransactions + 'static, { - /// Executes the future on a new blocking task. - async fn on_blocking_task(&self, c: C) -> EthResult - where - C: FnOnce(Self) -> F, - F: Future> + Send + 'static, - R: Send + 'static, - { - let (tx, rx) = oneshot::channel(); - let this = self.clone(); - let f = c(this); - self.inner.task_spawner.spawn_blocking(Box::pin(async move { - let res = f.await; - let _ = tx.send(res); - })); - rx.await.map_err(|_| EthApiError::InternalTracingError)? - } - /// Acquires a permit to execute a tracing call. async fn acquire_trace_permit(&self) -> Result { self.inner.tracing_call_guard.clone().acquire_owned().await } - /// Trace the entire block - fn trace_block_with_sync( - &self, - at: BlockId, - transactions: Vec, - cfg: CfgEnv, - block_env: BlockEnv, - opts: GethDebugTracingOptions, - ) -> EthResult> { - // replay all transactions of the block - let this = self.clone(); - self.inner.eth_api.with_state_at_block(at, move |state| { - let mut results = Vec::with_capacity(transactions.len()); - let mut db = SubState::new(State::new(state)); - - let mut transactions = transactions.into_iter().peekable(); - while let Some(tx) = transactions.next() { - let tx = tx.into_ecrecovered().ok_or(BlockError::InvalidSignature)?; - let tx = tx_env_with_recovered(&tx); - let env = Env { cfg: cfg.clone(), block: block_env.clone(), tx }; - let (result, state_changes) = - this.trace_transaction(opts.clone(), env, at, &mut db)?; - results.push(TraceResult::Success { result }); - - if transactions.peek().is_some() { - // need to apply the state changes of this transaction before executing the next - // transaction - db.commit(state_changes) - } - } - - Ok(results) - }) - } - /// Trace the entire block asynchronously async fn trace_block_with( &self, @@ -140,10 +88,33 @@ where block_env: BlockEnv, opts: GethDebugTracingOptions, ) -> EthResult> { - self.on_blocking_task(|this| async move { - this.trace_block_with_sync(at, transactions, cfg, block_env, opts) - }) - .await + // replay all transactions of the block + let this = self.clone(); + self.inner + .eth_api + .spawn_with_state_at_block(at, move |state| { + let mut results = Vec::with_capacity(transactions.len()); + let mut db = SubState::new(State::new(state)); + + let mut transactions = transactions.into_iter().peekable(); + while let Some(tx) = transactions.next() { + let tx = tx.into_ecrecovered().ok_or(BlockError::InvalidSignature)?; + let tx = tx_env_with_recovered(&tx); + let env = Env { cfg: cfg.clone(), block: block_env.clone(), tx }; + let (result, state_changes) = + this.trace_transaction(opts.clone(), env, at, &mut db)?; + results.push(TraceResult::Success { result }); + + if transactions.peek().is_some() { + // need to apply the state changes of this transaction before executing the + // next transaction + db.commit(state_changes) + } + } + + Ok(results) + }) + .await } /// Replays the given block and returns the trace of each transaction. @@ -171,17 +142,6 @@ where &self, block_id: BlockId, opts: GethDebugTracingOptions, - ) -> EthResult> { - self.on_blocking_task( - |this| async move { this.try_debug_trace_block(block_id, opts).await }, - ) - .await - } - - async fn try_debug_trace_block( - &self, - block_id: BlockId, - opts: GethDebugTracingOptions, ) -> EthResult> { let block_hash = self .inner @@ -199,7 +159,7 @@ where // its parent block's state let state_at = block.parent_hash; - self.trace_block_with_sync(state_at.into(), block.body, cfg, block_env, opts) + self.trace_block_with(state_at.into(), block.body, cfg, block_env, opts).await } /// Trace the transaction according to the provided options. @@ -221,8 +181,10 @@ where let state_at: BlockId = block.parent_hash.into(); let block_txs = block.body; - self.on_blocking_task(|this| async move { - this.inner.eth_api.with_state_at_block(state_at, |state| { + let this = self.clone(); + self.inner + .eth_api + .spawn_with_state_at_block(state_at, move |state| { // configure env for the target transaction let tx = transaction.into_recovered(); @@ -239,8 +201,7 @@ where let env = Env { cfg, block: block_env, tx: tx_env_with_recovered(&tx) }; this.trace_transaction(opts, env, state_at, &mut db).map(|(trace, _)| trace) }) - }) - .await + .await } /// The debug_traceCall method lets you run an `eth_call` within the context of the given block @@ -250,22 +211,6 @@ where call: CallRequest, block_id: Option, opts: GethDebugTracingCallOptions, - ) -> EthResult { - self.on_blocking_task(|this| async move { - this.try_debug_trace_call(call, block_id, opts).await - }) - .await - } - - /// The debug_traceCall method lets you run an `eth_call` within the context of the given block - /// execution using the final state of parent block as the base. - /// - /// Caution: while this is async, this may still be blocking on necessary DB io. - async fn try_debug_trace_call( - &self, - call: CallRequest, - block_id: Option, - opts: GethDebugTracingCallOptions, ) -> EthResult { let at = block_id.unwrap_or(BlockId::Number(BlockNumberOrTag::Latest)); let GethDebugTracingCallOptions { tracing_options, state_overrides, block_overrides } = @@ -278,10 +223,13 @@ where GethDebugTracerType::BuiltInTracer(tracer) => match tracer { GethDebugBuiltInTracerType::FourByteTracer => { let mut inspector = FourByteInspector::default(); - let (_res, _) = self + let inspector = self .inner .eth_api - .inspect_call_at(call, at, overrides, &mut inspector) + .spawn_with_call_at(call, at, overrides, move |db, env| { + inspect(db, env, &mut inspector)?; + Ok(inspector) + }) .await?; return Ok(FourByteFrame::from(inspector).into()) } @@ -295,10 +243,13 @@ where .set_record_logs(call_config.with_log.unwrap_or_default()), ); - let _ = self + let inspector = self .inner .eth_api - .inspect_call_at(call, at, overrides, &mut inspector) + .spawn_with_call_at(call, at, overrides, move |db, env| { + inspect(db, env, &mut inspector)?; + Ok(inspector) + }) .await?; let frame = inspector.into_geth_builder().geth_call_traces(call_config); @@ -351,8 +302,14 @@ where let mut inspector = TracingInspector::new(inspector_config); - let (res, _) = - self.inner.eth_api.inspect_call_at(call, at, overrides, &mut inspector).await?; + let (res, inspector) = self + .inner + .eth_api + .spawn_with_call_at(call, at, overrides, move |db, env| { + let (res, _) = inspect(db, env, &mut inspector)?; + Ok((res, inspector)) + }) + .await?; let gas_used = res.result.gas_used(); let return_value = result_output(&res.result).unwrap_or_default().into(); let frame = inspector.into_geth_builder().geth_traces(gas_used, return_value, config); @@ -365,6 +322,8 @@ where /// Returns the trace frame and the state that got updated after executing the transaction. /// /// Note: this does not apply any state overrides if they're configured in the `opts`. + /// + /// Caution: this is blocking and should be performed on a blocking task. fn trace_transaction( &self, opts: GethDebugTracingOptions, diff --git a/crates/rpc/rpc/src/eth/api/mod.rs b/crates/rpc/rpc/src/eth/api/mod.rs index de5d12f8d1..20ecf114e2 100644 --- a/crates/rpc/rpc/src/eth/api/mod.rs +++ b/crates/rpc/rpc/src/eth/api/mod.rs @@ -37,6 +37,7 @@ mod sign; mod state; mod transactions; +use crate::TracingCallPool; pub use transactions::{EthTransactions, TransactionSource}; /// `Eth` API trait. @@ -88,6 +89,7 @@ where eth_cache: EthStateCache, gas_oracle: GasPriceOracle, gas_cap: impl Into, + tracing_call_pool: TracingCallPool, ) -> Self { Self::with_spawner( provider, @@ -97,10 +99,12 @@ where gas_oracle, gas_cap.into().into(), Box::::default(), + tracing_call_pool, ) } /// Creates a new, shareable instance. + #[allow(clippy::too_many_arguments)] pub fn with_spawner( provider: Provider, pool: Pool, @@ -109,6 +113,7 @@ where gas_oracle: GasPriceOracle, gas_cap: u64, task_spawner: Box, + tracing_call_pool: TracingCallPool, ) -> Self { // get the block number of the latest block let latest_block = provider @@ -129,6 +134,7 @@ where starting_block: U256::from(latest_block), task_spawner, pending_block: Default::default(), + tracing_call_pool, }; Self { inner: Arc::new(inner) } } @@ -421,4 +427,6 @@ struct EthApiInner { task_spawner: Box, /// Cached pending block if any pending_block: Mutex>, + /// A pool dedicated to tracing calls + tracing_call_pool: TracingCallPool, } diff --git a/crates/rpc/rpc/src/eth/api/server.rs b/crates/rpc/rpc/src/eth/api/server.rs index 1cca7addc9..663308cd89 100644 --- a/crates/rpc/rpc/src/eth/api/server.rs +++ b/crates/rpc/rpc/src/eth/api/server.rs @@ -392,7 +392,7 @@ where mod tests { use crate::{ eth::{cache::EthStateCache, gas_oracle::GasPriceOracle}, - EthApi, + EthApi, TracingCallPool, }; use jsonrpsee::types::error::INVALID_PARAMS_CODE; use reth_interfaces::test_utils::{generators, generators::Rng}; @@ -428,6 +428,7 @@ mod tests { cache.clone(), GasPriceOracle::new(provider, Default::default(), cache), ETHEREUM_BLOCK_GAS_LIMIT, + TracingCallPool::build().expect("failed to build tracing pool"), ) } diff --git a/crates/rpc/rpc/src/eth/api/state.rs b/crates/rpc/rpc/src/eth/api/state.rs index 0930bf0b6c..2887ac58fb 100644 --- a/crates/rpc/rpc/src/eth/api/state.rs +++ b/crates/rpc/rpc/src/eth/api/state.rs @@ -146,7 +146,10 @@ where #[cfg(test)] mod tests { use super::*; - use crate::eth::{cache::EthStateCache, gas_oracle::GasPriceOracle}; + use crate::{ + eth::{cache::EthStateCache, gas_oracle::GasPriceOracle}, + TracingCallPool, + }; use reth_primitives::{constants::ETHEREUM_BLOCK_GAS_LIMIT, StorageKey, StorageValue}; use reth_provider::test_utils::{ExtendedAccount, MockEthProvider, NoopProvider}; use reth_transaction_pool::test_utils::testing_pool; @@ -165,6 +168,7 @@ mod tests { cache.clone(), GasPriceOracle::new(NoopProvider::default(), Default::default(), cache), ETHEREUM_BLOCK_GAS_LIMIT, + TracingCallPool::build().expect("failed to build tracing pool"), ); let address = Address::random(); let storage = eth_api.storage_at(address, U256::ZERO.into(), None).unwrap(); @@ -186,6 +190,7 @@ mod tests { cache.clone(), GasPriceOracle::new(mock_provider, Default::default(), cache), ETHEREUM_BLOCK_GAS_LIMIT, + TracingCallPool::build().expect("failed to build tracing pool"), ); let storage_key: U256 = storage_key.into(); diff --git a/crates/rpc/rpc/src/eth/api/transactions.rs b/crates/rpc/rpc/src/eth/api/transactions.rs index 514d436b3a..6d869daa60 100644 --- a/crates/rpc/rpc/src/eth/api/transactions.rs +++ b/crates/rpc/rpc/src/eth/api/transactions.rs @@ -40,7 +40,10 @@ use revm_primitives::{utilities::create_address, Env, ResultAndState, SpecId}; /// Helper alias type for the state's [CacheDB] pub(crate) type StateCacheDB<'r> = CacheDB>>; -/// Commonly used transaction related functions for the [EthApi] type in the `eth_` namespace +/// Commonly used transaction related functions for the [EthApi] type in the `eth_` namespace. +/// +/// Async functions that are spawned onto the +/// [TracingCallPool](crate::tracing_call::TracingCallPool) begin with `spawn_` #[async_trait::async_trait] pub trait EthTransactions: Send + Sync { /// Returns default gas limit to use for `eth_call` and tracing RPC methods. @@ -54,6 +57,12 @@ pub trait EthTransactions: Send + Sync { where F: FnOnce(StateProviderBox<'_>) -> EthResult; + /// Executes the closure with the state that corresponds to the given [BlockId] on a new task + async fn spawn_with_state_at_block(&self, at: BlockId, f: F) -> EthResult + where + F: FnOnce(StateProviderBox<'_>) -> EthResult + Send + 'static, + T: Send + 'static; + /// Returns the revm evm env for the requested [BlockId] /// /// If the [BlockId] this will return the [BlockId::Hash] of the block the env was configured @@ -121,8 +130,8 @@ pub trait EthTransactions: Send + Sync { async fn send_transaction(&self, request: TransactionRequest) -> EthResult; /// Prepares the state and env for the given [CallRequest] at the given [BlockId] and executes - /// the closure. - async fn with_call_at( + /// the closure on a new task returning the result of the closure. + async fn spawn_with_call_at( &self, request: CallRequest, at: BlockId, @@ -130,7 +139,8 @@ pub trait EthTransactions: Send + Sync { f: F, ) -> EthResult where - F: for<'r> FnOnce(StateCacheDB<'r>, Env) -> EthResult + Send; + F: for<'r> FnOnce(StateCacheDB<'r>, Env) -> EthResult + Send + 'static, + R: Send + 'static; /// Executes the call request at the given [BlockId]. async fn transact_call_at( @@ -140,8 +150,9 @@ pub trait EthTransactions: Send + Sync { overrides: EvmOverrides, ) -> EthResult<(ResultAndState, Env)>; - /// Executes the call request at the given [BlockId] - async fn inspect_call_at( + /// Executes the call request at the given [BlockId] on a new task and returns the result of the + /// inspect call. + async fn spawn_inspect_call_at( &self, request: CallRequest, at: BlockId, @@ -149,24 +160,15 @@ pub trait EthTransactions: Send + Sync { inspector: I, ) -> EthResult<(ResultAndState, Env)> where - I: for<'r> Inspector> + Send; - - /// Executes the call request at the given [BlockId] - async fn inspect_call_at_and_return_state<'a, I>( - &'a self, - request: CallRequest, - at: BlockId, - overrides: EvmOverrides, - inspector: I, - ) -> EthResult<(ResultAndState, Env, StateCacheDB<'a>)> - where - I: Inspector> + Send; + I: for<'r> Inspector> + Send + 'static; /// Executes the transaction on top of the given [BlockId] with a tracer configured by the /// config. /// /// The callback is then called with the [TracingInspector] and the [ResultAndState] after the /// configured [Env] was inspected. + /// + /// Caution: this is blocking fn trace_at( &self, env: Env, @@ -184,7 +186,7 @@ pub trait EthTransactions: Send + Sync { /// /// The callback is then called with the [TracingInspector] and the [ResultAndState] after the /// configured [Env] was inspected. - fn trace_at_with_state( + async fn spawn_trace_at_with_state( &self, env: Env, config: TracingInspectorConfig, @@ -192,7 +194,10 @@ pub trait EthTransactions: Send + Sync { f: F, ) -> EthResult where - F: for<'a> FnOnce(TracingInspector, ResultAndState, StateCacheDB<'a>) -> EthResult; + F: for<'a> FnOnce(TracingInspector, ResultAndState, StateCacheDB<'a>) -> EthResult + + Send + + 'static, + R: Send + 'static; /// Fetches the transaction and the transaction's block async fn transaction_and_block( @@ -206,7 +211,10 @@ pub trait EthTransactions: Send + Sync { /// state by executing them first. /// The callback `f` is invoked with the [ResultAndState] after the transaction was executed and /// the database that points to the beginning of the transaction. - async fn trace_transaction_in_block( + /// + /// Note: Implementers should use a threadpool where blocking is allowed, such as + /// [TracingCallPool](crate::tracing_call::TracingCallPool). + async fn spawn_trace_transaction_in_block( &self, hash: H256, config: TracingInspectorConfig, @@ -219,7 +227,9 @@ pub trait EthTransactions: Send + Sync { ResultAndState, StateCacheDB<'a>, ) -> EthResult - + Send; + + Send + + 'static, + R: Send + 'static; } #[async_trait] @@ -245,6 +255,22 @@ where f(state) } + async fn spawn_with_state_at_block(&self, at: BlockId, f: F) -> EthResult + where + F: FnOnce(StateProviderBox<'_>) -> EthResult + Send + 'static, + T: Send + 'static, + { + let this = self.clone(); + self.inner + .tracing_call_pool + .spawn(move || { + let state = this.state_at(at)?; + f(state) + }) + .await + .map_err(|_| EthApiError::InternalTracingError)? + } + async fn evm_env_at(&self, at: BlockId) -> EthResult<(CfgEnv, BlockEnv, BlockId)> { if at.is_pending() { let PendingBlockEnv { cfg, block_env, origin } = self.pending_block_env_and_cfg()?; @@ -473,7 +499,7 @@ where Ok(hash) } - async fn with_call_at( + async fn spawn_with_call_at( &self, request: CallRequest, at: BlockId, @@ -481,15 +507,29 @@ where f: F, ) -> EthResult where - F: for<'r> FnOnce(StateCacheDB<'r>, Env) -> EthResult + Send, + F: for<'r> FnOnce(StateCacheDB<'r>, Env) -> EthResult + Send + 'static, + R: Send + 'static, { let (cfg, block_env, at) = self.evm_env_at(at).await?; - let state = self.state_at(at)?; - let mut db = SubState::new(State::new(state)); + let this = self.clone(); + self.inner + .tracing_call_pool + .spawn(move || { + let state = this.state_at(at)?; + let mut db = SubState::new(State::new(state)); - let env = - prepare_call_env(cfg, block_env, request, self.call_gas_limit(), &mut db, overrides)?; - f(db, env) + let env = prepare_call_env( + cfg, + block_env, + request, + this.call_gas_limit(), + &mut db, + overrides, + )?; + f(db, env) + }) + .await + .map_err(|_| EthApiError::InternalTracingError)? } async fn transact_call_at( @@ -498,10 +538,11 @@ where at: BlockId, overrides: EvmOverrides, ) -> EthResult<(ResultAndState, Env)> { - self.with_call_at(request, at, overrides, |mut db, env| transact(&mut db, env)).await + self.spawn_with_call_at(request, at, overrides, move |mut db, env| transact(&mut db, env)) + .await } - async fn inspect_call_at( + async fn spawn_inspect_call_at( &self, request: CallRequest, at: BlockId, @@ -509,28 +550,10 @@ where inspector: I, ) -> EthResult<(ResultAndState, Env)> where - I: for<'r> Inspector> + Send, + I: for<'r> Inspector> + Send + 'static, { - self.with_call_at(request, at, overrides, |db, env| inspect(db, env, inspector)).await - } - - async fn inspect_call_at_and_return_state<'a, I>( - &'a self, - request: CallRequest, - at: BlockId, - overrides: EvmOverrides, - inspector: I, - ) -> EthResult<(ResultAndState, Env, StateCacheDB<'a>)> - where - I: Inspector> + Send, - { - let (cfg, block_env, at) = self.evm_env_at(at).await?; - let state = self.state_at(at)?; - let mut db = SubState::new(State::new(state)); - - let env = - prepare_call_env(cfg, block_env, request, self.call_gas_limit(), &mut db, overrides)?; - inspect_and_return_db(db, env, inspector) + self.spawn_with_call_at(request, at, overrides, move |db, env| inspect(db, env, inspector)) + .await } fn trace_at( @@ -553,7 +576,7 @@ where }) } - fn trace_at_with_state( + async fn spawn_trace_at_with_state( &self, env: Env, config: TracingInspectorConfig, @@ -561,15 +584,19 @@ where f: F, ) -> EthResult where - F: for<'a> FnOnce(TracingInspector, ResultAndState, StateCacheDB<'a>) -> EthResult, + F: for<'a> FnOnce(TracingInspector, ResultAndState, StateCacheDB<'a>) -> EthResult + + Send + + 'static, + R: Send + 'static, { - self.with_state_at_block(at, |state| { + self.spawn_with_state_at_block(at, move |state| { let db = SubState::new(State::new(state)); let mut inspector = TracingInspector::new(config); let (res, _, db) = inspect_and_return_db(db, env, &mut inspector)?; f(inspector, res, db) }) + .await } async fn transaction_and_block( @@ -590,7 +617,7 @@ where Ok(block.map(|block| (transaction, block.seal(block_hash)))) } - async fn trace_transaction_in_block( + async fn spawn_trace_transaction_in_block( &self, hash: H256, config: TracingInspectorConfig, @@ -603,7 +630,9 @@ where ResultAndState, StateCacheDB<'a>, ) -> EthResult - + Send, + + Send + + 'static, + R: Send + 'static, { let (transaction, block) = match self.transaction_and_block(hash).await? { None => return Ok(None), @@ -618,7 +647,7 @@ where let parent_block = block.parent_hash; let block_txs = block.body; - self.with_state_at_block(parent_block.into(), |state| { + self.spawn_with_state_at_block(parent_block.into(), move |state| { let mut db = SubState::new(State::new(state)); // replay all transactions prior to the targeted transaction @@ -630,6 +659,7 @@ where let (res, _, db) = inspect_and_return_db(db, env, &mut inspector)?; f(tx_info, inspector, res, db) }) + .await .map(Some) } } @@ -878,7 +908,7 @@ mod tests { use super::*; use crate::{ eth::{cache::EthStateCache, gas_oracle::GasPriceOracle}, - EthApi, + EthApi, TracingCallPool, }; use reth_network_api::noop::NoopNetwork; use reth_primitives::{constants::ETHEREUM_BLOCK_GAS_LIMIT, hex_literal::hex, Bytes}; @@ -900,6 +930,7 @@ mod tests { cache.clone(), GasPriceOracle::new(noop_provider, Default::default(), cache), ETHEREUM_BLOCK_GAS_LIMIT, + TracingCallPool::build().expect("failed to build tracing pool"), ); // https://etherscan.io/tx/0xa694b71e6c128a2ed8e2e0f6770bddbe52e3bb8f10e8472f9a79ab81497a8b5d diff --git a/crates/rpc/rpc/src/lib.rs b/crates/rpc/rpc/src/lib.rs index b885608139..e1818523f2 100644 --- a/crates/rpc/rpc/src/lib.rs +++ b/crates/rpc/rpc/src/lib.rs @@ -31,7 +31,6 @@ //! disk-io, hence these calls are spawned as futures to a blocking task manually. mod admin; -mod call_guard; mod debug; mod engine; pub mod eth; @@ -41,11 +40,11 @@ mod otterscan; mod reth; mod rpc; mod trace; +pub mod tracing_call; mod txpool; mod web3; pub use admin::AdminApi; -pub use call_guard::TracingCallGuard; pub use debug::DebugApi; pub use engine::{EngineApi, EngineEthApi}; pub use eth::{EthApi, EthApiSpec, EthFilter, EthPubSub, EthSubscriptionIdProvider}; @@ -55,6 +54,7 @@ pub use otterscan::OtterscanApi; pub use reth::RethApi; pub use rpc::RPCApi; pub use trace::TraceApi; +pub use tracing_call::{TracingCallGuard, TracingCallPool}; pub use txpool::TxPoolApi; pub use web3::Web3Api; diff --git a/crates/rpc/rpc/src/trace.rs b/crates/rpc/rpc/src/trace.rs index ff2d09e50c..494741097c 100644 --- a/crates/rpc/rpc/src/trace.rs +++ b/crates/rpc/rpc/src/trace.rs @@ -1,8 +1,7 @@ use crate::{ eth::{ - cache::EthStateCache, error::{EthApiError, EthResult}, - revm_utils::{inspect, prepare_call_env, EvmOverrides}, + revm_utils::{inspect, inspect_and_return_db, prepare_call_env, EvmOverrides}, utils::recover_raw_transaction, EthTransactions, }, @@ -29,11 +28,10 @@ use reth_rpc_types::{ trace::{filter::TraceFilter, parity::*}, BlockError, BlockOverrides, CallRequest, Index, TransactionInfo, }; -use reth_tasks::TaskSpawner; use revm::{db::CacheDB, primitives::Env}; use revm_primitives::{db::DatabaseCommit, ExecutionResult, ResultAndState}; -use std::{collections::HashSet, future::Future, sync::Arc}; -use tokio::sync::{oneshot, AcquireError, OwnedSemaphorePermit}; +use std::{collections::HashSet, sync::Arc}; +use tokio::sync::{AcquireError, OwnedSemaphorePermit}; /// `trace` API implementation. /// @@ -51,20 +49,8 @@ impl TraceApi { } /// Create a new instance of the [TraceApi] - pub fn new( - provider: Provider, - eth_api: Eth, - eth_cache: EthStateCache, - task_spawner: Box, - tracing_call_guard: TracingCallGuard, - ) -> Self { - let inner = Arc::new(TraceApiInner { - provider, - eth_api, - eth_cache, - task_spawner, - tracing_call_guard, - }); + pub fn new(provider: Provider, eth_api: Eth, tracing_call_guard: TracingCallGuard) -> Self { + let inner = Arc::new(TraceApiInner { provider, eth_api, tracing_call_guard }); Self { inner } } @@ -83,23 +69,6 @@ where Provider: BlockReader + StateProviderFactory + EvmEnvProvider + ChainSpecProvider + 'static, Eth: EthTransactions + 'static, { - /// Executes the future on a new blocking task. - async fn on_blocking_task(&self, c: C) -> EthResult - where - C: FnOnce(Self) -> F, - F: Future> + Send + 'static, - R: Send + 'static, - { - let (tx, rx) = oneshot::channel(); - let this = self.clone(); - let f = c(this); - self.inner.task_spawner.spawn_blocking(Box::pin(async move { - let res = f.await; - let _ = tx.send(res); - })); - rx.await.map_err(|_| EthApiError::InternalTracingError)? - } - /// Executes the given call and returns a number of possible traces for it. pub async fn trace_call( &self, @@ -108,43 +77,23 @@ where block_id: Option, state_overrides: Option, block_overrides: Option>, - ) -> EthResult { - self.on_blocking_task(|this| async move { - this.try_trace_call( - call, - trace_types, - block_id, - EvmOverrides::new(state_overrides, block_overrides), - ) - .await - }) - .await - } - - async fn try_trace_call( - &self, - call: CallRequest, - trace_types: HashSet, - block_id: Option, - overrides: EvmOverrides, ) -> EthResult { let at = block_id.unwrap_or(BlockId::Number(BlockNumberOrTag::Latest)); let config = tracing_config(&trace_types); + let overrides = EvmOverrides::new(state_overrides, block_overrides); let mut inspector = TracingInspector::new(config); - - let (res, _, db) = self - .inner + self.inner .eth_api - .inspect_call_at_and_return_state(call, at, overrides, &mut inspector) - .await?; - - let trace_res = inspector.into_parity_builder().into_trace_results_with_state( - res, - &trace_types, - &db, - )?; - - Ok(trace_res) + .spawn_with_call_at(call, at, overrides, move |db, env| { + let (res, _, db) = inspect_and_return_db(db, env, &mut inspector)?; + let trace_res = inspector.into_parity_builder().into_trace_results_with_state( + res, + &trace_types, + &db, + )?; + Ok(trace_res) + }) + .await } /// Traces a call to `eth_sendRawTransaction` without making the call, returning the traces. @@ -166,16 +115,16 @@ where let config = tracing_config(&trace_types); - self.on_blocking_task(|this| async move { - this.inner.eth_api.trace_at_with_state(env, config, at, |inspector, res, db| { + self.inner + .eth_api + .spawn_trace_at_with_state(env, config, at, move |inspector, res, db| { Ok(inspector.into_parity_builder().into_trace_results_with_state( res, &trace_types, &db, )?) }) - }) - .await + .await } /// Performs multiple call traces on top of the same block. i.e. transaction n will be executed @@ -190,10 +139,11 @@ where let at = block_id.unwrap_or(BlockId::Number(BlockNumberOrTag::Pending)); let (cfg, block_env, at) = self.inner.eth_api.evm_env_at(at).await?; - self.on_blocking_task(|this| async move { - let gas_limit = this.inner.eth_api.call_gas_limit(); - // execute all transactions on top of each other and record the traces - this.inner.eth_api.with_state_at_block(at, move |state| { + let gas_limit = self.inner.eth_api.call_gas_limit(); + // execute all transactions on top of each other and record the traces + self.inner + .eth_api + .spawn_with_state_at_block(at, move |state| { let mut results = Vec::with_capacity(calls.len()); let mut db = SubState::new(State::new(state)); @@ -239,8 +189,7 @@ where Ok(results) }) - }) - .await + .await } /// Replays a transaction, returning the traces. @@ -250,22 +199,19 @@ where trace_types: HashSet, ) -> EthResult { let config = tracing_config(&trace_types); - self.on_blocking_task(|this| async move { - this.inner - .eth_api - .trace_transaction_in_block(hash, config, |_, inspector, res, db| { - let trace_res = inspector.into_parity_builder().into_trace_results_with_state( - res, - &trace_types, - &db, - )?; - Ok(trace_res) - }) - .await - .transpose() - .ok_or_else(|| EthApiError::TransactionNotFound)? - }) - .await + self.inner + .eth_api + .spawn_trace_transaction_in_block(hash, config, move |_, inspector, res, db| { + let trace_res = inspector.into_parity_builder().into_trace_results_with_state( + res, + &trace_types, + &db, + )?; + Ok(trace_res) + }) + .await + .transpose() + .ok_or_else(|| EthApiError::TransactionNotFound)? } /// Returns transaction trace objects at the given index @@ -308,22 +254,18 @@ where &self, hash: H256, ) -> EthResult>> { - self.on_blocking_task(|this| async move { - this.inner - .eth_api - .trace_transaction_in_block( - hash, - TracingInspectorConfig::default_parity(), - |tx_info, inspector, _, _| { - let traces = inspector - .into_parity_builder() - .into_localized_transaction_traces(tx_info); - Ok(traces) - }, - ) - .await - }) - .await + self.inner + .eth_api + .spawn_trace_transaction_in_block( + hash, + TracingInspectorConfig::default_parity(), + move |tx_info, inspector, _, _| { + let traces = + inspector.into_parity_builder().into_localized_transaction_traces(tx_info); + Ok(traces) + }, + ) + .await } /// Executes all transactions of a block and returns a list of callback results. @@ -371,48 +313,46 @@ where let block_hash = block.hash; let transactions = block.body; - self.on_blocking_task(|this| async move { - // replay all transactions of the block - this.inner - .eth_api - .with_state_at_block(state_at.into(), move |state| { - let mut results = Vec::with_capacity(transactions.len()); - let mut db = SubState::new(State::new(state)); + // replay all transactions of the block + self.inner + .eth_api + .spawn_with_state_at_block(state_at.into(), move |state| { + let mut results = Vec::with_capacity(transactions.len()); + let mut db = SubState::new(State::new(state)); - let mut transactions = transactions.into_iter().enumerate().peekable(); + let mut transactions = transactions.into_iter().enumerate().peekable(); - while let Some((idx, tx)) = transactions.next() { - let tx = tx.into_ecrecovered().ok_or(BlockError::InvalidSignature)?; - let tx_info = TransactionInfo { - hash: Some(tx.hash()), - index: Some(idx as u64), - block_hash: Some(block_hash), - block_number: Some(block_env.number.try_into().unwrap_or(u64::MAX)), - base_fee: Some(block_env.basefee.try_into().unwrap_or(u64::MAX)), - }; + while let Some((idx, tx)) = transactions.next() { + let tx = tx.into_ecrecovered().ok_or(BlockError::InvalidSignature)?; + let tx_info = TransactionInfo { + hash: Some(tx.hash()), + index: Some(idx as u64), + block_hash: Some(block_hash), + block_number: Some(block_env.number.try_into().unwrap_or(u64::MAX)), + base_fee: Some(block_env.basefee.try_into().unwrap_or(u64::MAX)), + }; - let tx = tx_env_with_recovered(&tx); - let env = Env { cfg: cfg.clone(), block: block_env.clone(), tx }; + let tx = tx_env_with_recovered(&tx); + let env = Env { cfg: cfg.clone(), block: block_env.clone(), tx }; - let mut inspector = TracingInspector::new(config); - let (res, _) = inspect(&mut db, env, &mut inspector)?; - let ResultAndState { result, state } = res; - results.push(f(tx_info, inspector, result, &state, &db)?); + let mut inspector = TracingInspector::new(config); + let (res, _) = inspect(&mut db, env, &mut inspector)?; + let ResultAndState { result, state } = res; + results.push(f(tx_info, inspector, result, &state, &db)?); - // need to apply the state changes of this transaction before executing the - // next transaction - if transactions.peek().is_some() { - // need to apply the state changes of this transaction before executing - // the next transaction - db.commit(state) - } + // need to apply the state changes of this transaction before executing the + // next transaction + if transactions.peek().is_some() { + // need to apply the state changes of this transaction before executing + // the next transaction + db.commit(state) } + } - Ok(results) - }) - .map(Some) - }) - .await + Ok(results) + }) + .await + .map(Some) } /// Returns traces created at given block. @@ -626,11 +566,6 @@ struct TraceApiInner { provider: Provider, /// Access to commonly used code of the `eth` namespace eth_api: Eth, - /// The async cache frontend for eth-related data - #[allow(unused)] // we need this for trace_filter eventually - eth_cache: EthStateCache, - /// The type that can spawn tasks which would otherwise be blocking. - task_spawner: Box, // restrict the number of concurrent calls to `trace_*` tracing_call_guard: TracingCallGuard, } diff --git a/crates/rpc/rpc-builder/src/tracing_pool.rs b/crates/rpc/rpc/src/tracing_call.rs similarity index 71% rename from crates/rpc/rpc-builder/src/tracing_pool.rs rename to crates/rpc/rpc/src/tracing_call.rs index dd30561178..26956ae2d5 100644 --- a/crates/rpc/rpc-builder/src/tracing_pool.rs +++ b/crates/rpc/rpc/src/tracing_call.rs @@ -8,9 +8,46 @@ use std::{ task::{ready, Context, Poll}, thread, }; -use tokio::sync::oneshot; +use tokio::sync::{oneshot, AcquireError, OwnedSemaphorePermit, Semaphore}; + +/// RPC Tracing call guard semaphore. +/// +/// This is used to restrict the number of concurrent RPC requests to tracing methods like +/// `debug_traceTransaction` because they can consume a lot of memory and CPU. +/// +/// This types serves as an entry guard for the [TracingCallPool] and is used to rate limit parallel +/// tracing calls on the pool. +#[derive(Clone, Debug)] +pub struct TracingCallGuard(Arc); + +impl TracingCallGuard { + /// Create a new `TracingCallGuard` with the given maximum number of tracing calls in parallel. + pub fn new(max_tracing_requests: u32) -> Self { + Self(Arc::new(Semaphore::new(max_tracing_requests as usize))) + } + + /// See also [Semaphore::acquire_owned] + pub async fn acquire_owned(self) -> Result { + self.0.acquire_owned().await + } + + /// See also [Semaphore::acquire_many_owned] + pub async fn acquire_many_owned(self, n: u32) -> Result { + self.0.acquire_many_owned(n).await + } +} /// Used to execute tracing calls on a rayon threadpool from within a tokio runtime. +/// +/// This is a dedicated threadpool for tracing calls which are CPU bound. +/// RPC calls that perform blocking IO (disk lookups) are not executed on this pool but on the tokio +/// runtime's blocking pool, which performs poorly with CPU bound tasks. Once the tokio blocking +/// pool is saturated it is converted into a queue, tracing calls could then interfere with the +/// queue and block other RPC calls. +/// +/// See also [tokio-docs] for more information. +/// +/// [tokio-docs]: https://docs.rs/tokio/latest/tokio/index.html#cpu-bound-tasks-and-blocking-code #[derive(Clone, Debug)] pub struct TracingCallPool { pool: Arc,