From bf9a3c533838775d30690604769ffdc6e5fbb5f3 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Mon, 15 May 2023 22:37:29 +0200 Subject: [PATCH] perf: spawn blocking trace calls (#2685) --- crates/rpc/rpc/src/debug.rs | 164 +++++++++++++----- crates/rpc/rpc/src/eth/error.rs | 4 + crates/rpc/rpc/src/trace.rs | 284 ++++++++++++++++++++------------ 3 files changed, 307 insertions(+), 145 deletions(-) diff --git a/crates/rpc/rpc/src/debug.rs b/crates/rpc/rpc/src/debug.rs index 02f999d6af..94c586f486 100644 --- a/crates/rpc/rpc/src/debug.rs +++ b/crates/rpc/rpc/src/debug.rs @@ -29,22 +29,14 @@ use reth_rpc_types::{ use reth_tasks::TaskSpawner; use revm::primitives::Env; use revm_primitives::{db::DatabaseCommit, BlockEnv, CfgEnv}; -use tokio::sync::{AcquireError, OwnedSemaphorePermit}; +use std::{future::Future, sync::Arc}; +use tokio::sync::{oneshot, AcquireError, OwnedSemaphorePermit}; /// `debug` API implementation. /// /// This type provides the functionality for handling `debug` related requests. -#[non_exhaustive] pub struct DebugApi { - /// The client that can interact with the chain. - client: Client, - /// The implementation of `eth` API - eth_api: Eth, - // restrict the number of concurrent calls to tracing calls - tracing_call_guard: TracingCallGuard, - /// The type that can spawn tasks which would otherwise block. - #[allow(unused)] - task_spawner: Box, + inner: Arc>, } // === impl DebugApi === @@ -57,7 +49,9 @@ impl DebugApi { task_spawner: Box, tracing_call_guard: TracingCallGuard, ) -> Self { - Self { client, eth_api: eth, task_spawner, tracing_call_guard } + let inner = + Arc::new(DebugApiInner { client, eth_api: eth, task_spawner, tracing_call_guard }); + Self { inner } } } @@ -68,13 +62,30 @@ where Client: BlockProviderIdExt + HeaderProvider + 'static, Eth: EthTransactions + 'static, { + /// Executes the future on a new blocking task. + async fn on_blocking_task(&self, c: C) -> EthResult + where + C: FnOnce(Self) -> F, + F: Future> + Send + 'static, + R: Send + 'static, + { + let (tx, rx) = oneshot::channel(); + let this = self.clone(); + let f = c(this); + self.inner.task_spawner.spawn_blocking(Box::pin(async move { + let res = f.await; + let _ = tx.send(res); + })); + rx.await.map_err(|_| EthApiError::InternalTracingError)? + } + /// Acquires a permit to execute a tracing call. async fn acquire_trace_permit(&self) -> Result { - self.tracing_call_guard.clone().acquire_owned().await + self.inner.tracing_call_guard.clone().acquire_owned().await } /// Trace the entire block - fn trace_block_with( + fn trace_block_with_sync( &self, at: BlockId, transactions: Vec, @@ -83,7 +94,7 @@ where opts: GethDebugTracingOptions, ) -> EthResult> { // replay all transactions of the block - self.eth_api.with_state_at_block(at, move |state| { + self.inner.eth_api.with_state_at_block(at, move |state| { let mut results = Vec::with_capacity(transactions.len()); let mut db = SubState::new(State::new(state)); @@ -106,6 +117,21 @@ where }) } + /// Trace the entire block asynchronously + async fn trace_block_with( + &self, + at: BlockId, + transactions: Vec, + cfg: CfgEnv, + block_env: BlockEnv, + opts: GethDebugTracingOptions, + ) -> EthResult> { + self.on_blocking_task(|this| async move { + this.trace_block_with_sync(at, transactions, cfg, block_env, opts) + }) + .await + } + /// Replays the given block and returns the trace of each transaction. /// /// This expects a rlp encoded block @@ -119,11 +145,11 @@ where let block = Block::decode(&mut rlp_block.as_ref()).map_err(BlockError::RlpDecodeRawBlock)?; - let (cfg, block_env) = self.eth_api.evm_env_for_raw_block(&block.header).await?; + let (cfg, block_env) = self.inner.eth_api.evm_env_for_raw_block(&block.header).await?; // we trace on top the block's parent block let parent = block.parent_hash; - self.trace_block_with(parent.into(), block.body, cfg, block_env, opts) + self.trace_block_with(parent.into(), block.body, cfg, block_env, opts).await } /// Replays a block and returns the trace of each transaction. @@ -131,15 +157,27 @@ where &self, block_id: BlockId, opts: GethDebugTracingOptions, + ) -> EthResult> { + self.on_blocking_task( + |this| async move { this.try_debug_trace_block(block_id, opts).await }, + ) + .await + } + + async fn try_debug_trace_block( + &self, + block_id: BlockId, + opts: GethDebugTracingOptions, ) -> EthResult> { let block_hash = self + .inner .client .block_hash_for_id(block_id)? .ok_or_else(|| EthApiError::UnknownBlockNumber)?; let ((cfg, block_env, _), block) = futures::try_join!( - self.eth_api.evm_env_at(block_hash.into()), - self.eth_api.block_by_id(block_id), + self.inner.eth_api.evm_env_at(block_hash.into()), + self.inner.eth_api.block_by_id(block_id), )?; let block = block.ok_or_else(|| EthApiError::UnknownBlockNumber)?; @@ -147,7 +185,7 @@ where // its parent block's state let state_at = block.parent_hash; - self.trace_block_with(state_at.into(), block.body, cfg, block_env, opts) + self.trace_block_with_sync(state_at.into(), block.body, cfg, block_env, opts) } /// Trace the transaction according to the provided options. @@ -158,28 +196,37 @@ where tx_hash: H256, opts: GethDebugTracingOptions, ) -> EthResult { - let (transaction, block) = match self.eth_api.transaction_and_block(tx_hash).await? { + let (transaction, block) = match self.inner.eth_api.transaction_and_block(tx_hash).await? { None => return Err(EthApiError::TransactionNotFound), Some(res) => res, }; - let (cfg, block_env, _) = self.eth_api.evm_env_at(block.hash.into()).await?; + let (cfg, block_env, _) = self.inner.eth_api.evm_env_at(block.hash.into()).await?; // we need to get the state of the parent block because we're essentially replaying the // block the transaction is included in let state_at = block.parent_hash; let block_txs = block.body; - self.eth_api.with_state_at_block(state_at.into(), |state| { - // configure env for the target transaction - let tx = transaction.into_recovered(); + self.on_blocking_task(|this| async move { + this.inner.eth_api.with_state_at_block(state_at.into(), |state| { + // configure env for the target transaction + let tx = transaction.into_recovered(); - let mut db = SubState::new(State::new(state)); - // replay all transactions prior to the targeted transaction - replay_transactions_until(&mut db, cfg.clone(), block_env.clone(), block_txs, tx.hash)?; + let mut db = SubState::new(State::new(state)); + // replay all transactions prior to the targeted transaction + replay_transactions_until( + &mut db, + cfg.clone(), + block_env.clone(), + block_txs, + tx.hash, + )?; - let env = Env { cfg, block: block_env, tx: tx_env_with_recovered(&tx) }; - trace_transaction(opts, env, &mut db).map(|(trace, _)| trace) + let env = Env { cfg, block: block_env, tx: tx_env_with_recovered(&tx) }; + trace_transaction(opts, env, &mut db).map(|(trace, _)| trace) + }) }) + .await } /// The debug_traceCall method lets you run an `eth_call` within the context of the given block @@ -189,6 +236,22 @@ where call: CallRequest, block_id: Option, opts: GethDebugTracingCallOptions, + ) -> EthResult { + self.on_blocking_task(|this| async move { + this.try_debug_trace_call(call, block_id, opts).await + }) + .await + } + + /// The debug_traceCall method lets you run an `eth_call` within the context of the given block + /// execution using the final state of parent block as the base. + /// + /// Caution: while this is async, this may still be blocking on necessary DB io. + async fn try_debug_trace_call( + &self, + call: CallRequest, + block_id: Option, + opts: GethDebugTracingCallOptions, ) -> EthResult { let at = block_id.unwrap_or(BlockId::Number(BlockNumberOrTag::Latest)); // TODO(mattsse) apply block overrides @@ -209,6 +272,7 @@ where GethDebugBuiltInTracerType::FourByteTracer => { let mut inspector = FourByteInspector::default(); let (_res, _) = self + .inner .eth_api .inspect_call_at(call, at, state_overrides, &mut inspector) .await?; @@ -224,6 +288,7 @@ where ); let _ = self + .inner .eth_api .inspect_call_at(call, at, state_overrides, &mut inspector) .await?; @@ -249,7 +314,7 @@ where let mut inspector = TracingInspector::new(inspector_config); let (res, _) = - self.eth_api.inspect_call_at(call, at, state_overrides, &mut inspector).await?; + self.inner.eth_api.inspect_call_at(call, at, state_overrides, &mut inspector).await?; let gas_used = res.result.gas_used(); let frame = inspector.into_geth_builder().geth_traces(U256::from(gas_used), config); @@ -267,13 +332,15 @@ where /// Handler for `debug_getRawHeader` async fn raw_header(&self, block_id: BlockId) -> RpcResult { let header = match block_id { - BlockId::Hash(hash) => self.client.header(&hash.into()).to_rpc_result()?, + BlockId::Hash(hash) => self.inner.client.header(&hash.into()).to_rpc_result()?, BlockId::Number(number_or_tag) => { - let number = - self.client.convert_block_number(number_or_tag).to_rpc_result()?.ok_or_else( - || internal_rpc_err("Pending block not supported".to_string()), - )?; - self.client.header_by_number(number).to_rpc_result()? + let number = self + .inner + .client + .convert_block_number(number_or_tag) + .to_rpc_result()? + .ok_or_else(|| internal_rpc_err("Pending block not supported".to_string()))?; + self.inner.client.header_by_number(number).to_rpc_result()? } }; @@ -287,7 +354,7 @@ where /// Handler for `debug_getRawBlock` async fn raw_block(&self, block_id: BlockId) -> RpcResult { - let block = self.client.block_by_id(block_id).to_rpc_result()?; + let block = self.inner.client.block_by_id(block_id).to_rpc_result()?; let mut res = Vec::new(); if let Some(mut block) = block { @@ -304,7 +371,7 @@ where /// Handler for `debug_getRawTransaction` /// Returns the bytes of the transaction for the given hash. async fn raw_transaction(&self, hash: H256) -> RpcResult { - let tx = self.eth_api.transaction_by_hash(hash).await?; + let tx = self.inner.eth_api.transaction_by_hash(hash).await?; let mut res = Vec::new(); if let Some(tx) = tx.map(TransactionSource::into_recovered) { @@ -317,7 +384,7 @@ where /// Handler for `debug_getRawReceipts` async fn raw_receipts(&self, block_id: BlockId) -> RpcResult> { let receipts = - self.client.receipts_by_block_id(block_id).to_rpc_result()?.unwrap_or_default(); + self.inner.client.receipts_by_block_id(block_id).to_rpc_result()?.unwrap_or_default(); let mut all_receipts = Vec::with_capacity(receipts.len()); for receipt in receipts { @@ -402,6 +469,23 @@ impl std::fmt::Debug for DebugApi { } } +impl Clone for DebugApi { + fn clone(&self) -> Self { + Self { inner: Arc::clone(&self.inner) } + } +} + +struct DebugApiInner { + /// The client that can interact with the chain. + client: Client, + /// The implementation of `eth` API + eth_api: Eth, + // restrict the number of concurrent calls to tracing calls + tracing_call_guard: TracingCallGuard, + /// The type that can spawn tasks which would otherwise block. + task_spawner: Box, +} + /// Executes the configured transaction with the environment on the given database. /// /// Returns the trace frame and the state that got updated after executing the transaction. diff --git a/crates/rpc/rpc/src/eth/error.rs b/crates/rpc/rpc/src/eth/error.rs index 23b6b773cd..195c964ca7 100644 --- a/crates/rpc/rpc/src/eth/error.rs +++ b/crates/rpc/rpc/src/eth/error.rs @@ -63,6 +63,9 @@ pub enum EthApiError { /// Percentile array is invalid #[error("invalid reward percentile")] InvalidRewardPercentile(f64), + /// Error thrown when a spawned tracing task failed to deliver an anticipated response. + #[error("Internal error while tracing")] + InternalTracingError, } impl From for ErrorObject<'static> { @@ -87,6 +90,7 @@ impl From for ErrorObject<'static> { } EthApiError::Unsupported(msg) => internal_rpc_err(msg), EthApiError::InvalidRewardPercentile(msg) => internal_rpc_err(msg.to_string()), + err @ EthApiError::InternalTracingError => internal_rpc_err(err.to_string()), } } } diff --git a/crates/rpc/rpc/src/trace.rs b/crates/rpc/rpc/src/trace.rs index 6373ac00a6..e203f11981 100644 --- a/crates/rpc/rpc/src/trace.rs +++ b/crates/rpc/rpc/src/trace.rs @@ -26,26 +26,14 @@ use reth_rpc_types::{ use reth_tasks::TaskSpawner; use revm::primitives::Env; use revm_primitives::{db::DatabaseCommit, ExecutionResult}; -use std::collections::HashSet; -use tokio::sync::{AcquireError, OwnedSemaphorePermit}; +use std::{collections::HashSet, future::Future, sync::Arc}; +use tokio::sync::{oneshot, AcquireError, OwnedSemaphorePermit}; /// `trace` API implementation. /// /// This type provides the functionality for handling `trace` related requests. -#[derive(Clone)] pub struct TraceApi { - /// The client that can interact with the chain. - client: Client, - /// Access to commonly used code of the `eth` namespace - eth_api: Eth, - /// The async cache frontend for eth-related data - #[allow(unused)] // we need this for trace_filter eventually - eth_cache: EthStateCache, - /// The type that can spawn tasks which would otherwise be blocking. - #[allow(unused)] - task_spawner: Box, - // restrict the number of concurrent calls to `trace_*` - tracing_call_guard: TracingCallGuard, + inner: Arc>, } // === impl TraceApi === @@ -53,7 +41,7 @@ pub struct TraceApi { impl TraceApi { /// The client that can interact with the chain. pub fn client(&self) -> &Client { - &self.client + &self.inner.client } /// Create a new instance of the [TraceApi] @@ -64,14 +52,21 @@ impl TraceApi { task_spawner: Box, tracing_call_guard: TracingCallGuard, ) -> Self { - Self { client, eth_api, eth_cache, task_spawner, tracing_call_guard } + let inner = Arc::new(TraceApiInner { + client, + eth_api, + eth_cache, + task_spawner, + tracing_call_guard, + }); + Self { inner } } /// Acquires a permit to execute a tracing call. async fn acquire_trace_permit( &self, ) -> std::result::Result { - self.tracing_call_guard.clone().acquire_owned().await + self.inner.tracing_call_guard.clone().acquire_owned().await } } @@ -82,6 +77,23 @@ where Client: BlockProvider + StateProviderFactory + EvmEnvProvider + 'static, Eth: EthTransactions + 'static, { + /// Executes the future on a new blocking task. + async fn on_blocking_task(&self, c: C) -> EthResult + where + C: FnOnce(Self) -> F, + F: Future> + Send + 'static, + R: Send + 'static, + { + let (tx, rx) = oneshot::channel(); + let this = self.clone(); + let f = c(this); + self.inner.task_spawner.spawn_blocking(Box::pin(async move { + let res = f.await; + let _ = tx.send(res); + })); + rx.await.map_err(|_| EthApiError::InternalTracingError)? + } + /// Executes the given call and returns a number of possible traces for it. pub async fn trace_call( &self, @@ -89,12 +101,23 @@ where trace_types: HashSet, block_id: Option, ) -> EthResult { - let _permit = self.acquire_trace_permit().await; + self.on_blocking_task(|this| async move { + this.try_trace_call(call, trace_types, block_id).await + }) + .await + } + + async fn try_trace_call( + &self, + call: CallRequest, + trace_types: HashSet, + block_id: Option, + ) -> EthResult { let at = block_id.unwrap_or(BlockId::Number(BlockNumberOrTag::Latest)); let config = tracing_config(&trace_types); let mut inspector = TracingInspector::new(config); - let (res, _) = self.eth_api.inspect_call_at(call, at, None, &mut inspector).await?; + let (res, _) = self.inner.eth_api.inspect_call_at(call, at, None, &mut inspector).await?; let trace_res = inspector.into_parity_builder().into_trace_results(res.result, &trace_types); @@ -108,10 +131,10 @@ where trace_types: HashSet, block_id: Option, ) -> EthResult { - let _permit = self.acquire_trace_permit().await; let tx = recover_raw_transaction(tx)?; let (cfg, block, at) = self + .inner .eth_api .evm_env_at(block_id.unwrap_or(BlockId::Number(BlockNumberOrTag::Latest))) .await?; @@ -120,42 +143,50 @@ where let config = tracing_config(&trace_types); - self.eth_api.trace_at(env, config, at, |inspector, res| { - let trace_res = - inspector.into_parity_builder().into_trace_results(res.result, &trace_types); - Ok(trace_res) + self.on_blocking_task(|this| async move { + this.inner.eth_api.trace_at(env, config, at, |inspector, res| { + let trace_res = + inspector.into_parity_builder().into_trace_results(res.result, &trace_types); + Ok(trace_res) + }) }) + .await } /// Performs multiple call traces on top of the same block. i.e. transaction n will be executed /// on top of a pending block with all n-1 transactions applied (traced) first. /// - /// Note: Allows to trace dependent transactions, hence all transactions are traced in sequence + /// Note: Allows tracing dependent transactions, hence all transactions are traced in sequence pub async fn trace_call_many( &self, calls: Vec<(CallRequest, HashSet)>, block_id: Option, ) -> EthResult> { let at = block_id.unwrap_or(BlockId::Number(BlockNumberOrTag::Pending)); - let (cfg, block_env, at) = self.eth_api.evm_env_at(at).await?; + let (cfg, block_env, at) = self.inner.eth_api.evm_env_at(at).await?; - // execute all transactions on top of each other and record the traces - self.eth_api.with_state_at_block(at, move |state| { - let mut results = Vec::with_capacity(calls.len()); - let mut db = SubState::new(State::new(state)); + self.on_blocking_task(|this| async move { + // execute all transactions on top of each other and record the traces + this.inner.eth_api.with_state_at_block(at, move |state| { + let mut results = Vec::with_capacity(calls.len()); + let mut db = SubState::new(State::new(state)); - for (call, trace_types) in calls { - let env = prepare_call_env(cfg.clone(), block_env.clone(), call, &mut db, None)?; - let config = tracing_config(&trace_types); - let mut inspector = TracingInspector::new(config); - let (res, _) = inspect(&mut db, env, &mut inspector)?; - let trace_res = - inspector.into_parity_builder().into_trace_results(res.result, &trace_types); - results.push(trace_res); - } + for (call, trace_types) in calls { + let env = + prepare_call_env(cfg.clone(), block_env.clone(), call, &mut db, None)?; + let config = tracing_config(&trace_types); + let mut inspector = TracingInspector::new(config); + let (res, _) = inspect(&mut db, env, &mut inspector)?; + let trace_res = inspector + .into_parity_builder() + .into_trace_results(res.result, &trace_types); + results.push(trace_res); + } - Ok(results) + Ok(results) + }) }) + .await } /// Replays a transaction, returning the traces. @@ -165,15 +196,20 @@ where trace_types: HashSet, ) -> EthResult { let config = tracing_config(&trace_types); - self.eth_api - .trace_transaction_in_block(hash, config, |_, inspector, res| { - let trace_res = - inspector.into_parity_builder().into_trace_results(res.result, &trace_types); - Ok(trace_res) - }) - .await - .transpose() - .ok_or_else(|| EthApiError::TransactionNotFound)? + self.on_blocking_task(|this| async move { + this.inner + .eth_api + .trace_transaction_in_block(hash, config, |_, inspector, res| { + let trace_res = inspector + .into_parity_builder() + .into_trace_results(res.result, &trace_types); + Ok(trace_res) + }) + .await + .transpose() + .ok_or_else(|| EthApiError::TransactionNotFound)? + }) + .await } /// Returns transaction trace with the given address. @@ -182,8 +218,6 @@ where hash: H256, trace_address: Vec, ) -> EthResult> { - let _permit = self.acquire_trace_permit().await; - match self.trace_transaction(hash).await? { None => Ok(None), Some(traces) => { @@ -199,19 +233,22 @@ where &self, hash: H256, ) -> EthResult>> { - let _permit = self.acquire_trace_permit().await; - - self.eth_api - .trace_transaction_in_block( - hash, - TracingInspectorConfig::default_parity(), - |tx_info, inspector, _| { - let traces = - inspector.into_parity_builder().into_localized_transaction_traces(tx_info); - Ok(traces) - }, - ) - .await + self.on_blocking_task(|this| async move { + this.inner + .eth_api + .trace_transaction_in_block( + hash, + TracingInspectorConfig::default_parity(), + |tx_info, inspector, _| { + let traces = inspector + .into_parity_builder() + .into_localized_transaction_traces(tx_info); + Ok(traces) + }, + ) + .await + }) + .await } /// Executes all transactions of a block and returns a list of callback results. @@ -222,11 +259,12 @@ where f: F, ) -> EthResult>> where - F: Fn(TransactionInfo, TracingInspector, ExecutionResult) -> EthResult + Send, + F: Fn(TransactionInfo, TracingInspector, ExecutionResult) -> EthResult + Send + 'static, + R: Send + 'static, { let ((cfg, block_env, _), block) = futures::try_join!( - self.eth_api.evm_env_at(block_id), - self.eth_api.block_by_id(block_id), + self.inner.eth_api.evm_env_at(block_id), + self.inner.eth_api.block_by_id(block_id), )?; let block = match block { @@ -241,43 +279,47 @@ where let block_hash = block.hash; let transactions = block.body; - // replay all transactions of the block - self.eth_api - .with_state_at_block(state_at.into(), move |state| { - let mut results = Vec::with_capacity(transactions.len()); - let mut db = SubState::new(State::new(state)); + self.on_blocking_task(|this| async move { + // replay all transactions of the block + this.inner + .eth_api + .with_state_at_block(state_at.into(), move |state| { + let mut results = Vec::with_capacity(transactions.len()); + let mut db = SubState::new(State::new(state)); - let mut transactions = transactions.into_iter().enumerate().peekable(); + let mut transactions = transactions.into_iter().enumerate().peekable(); - while let Some((idx, tx)) = transactions.next() { - let tx = tx.into_ecrecovered().ok_or(BlockError::InvalidSignature)?; - let tx_info = TransactionInfo { - hash: Some(tx.hash()), - index: Some(idx as u64), - block_hash: Some(block_hash), - block_number: Some(block_env.number.try_into().unwrap_or(u64::MAX)), - base_fee: Some(block_env.basefee.try_into().unwrap_or(u64::MAX)), - }; + while let Some((idx, tx)) = transactions.next() { + let tx = tx.into_ecrecovered().ok_or(BlockError::InvalidSignature)?; + let tx_info = TransactionInfo { + hash: Some(tx.hash()), + index: Some(idx as u64), + block_hash: Some(block_hash), + block_number: Some(block_env.number.try_into().unwrap_or(u64::MAX)), + base_fee: Some(block_env.basefee.try_into().unwrap_or(u64::MAX)), + }; - let tx = tx_env_with_recovered(&tx); - let env = Env { cfg: cfg.clone(), block: block_env.clone(), tx }; + let tx = tx_env_with_recovered(&tx); + let env = Env { cfg: cfg.clone(), block: block_env.clone(), tx }; - let mut inspector = TracingInspector::new(config); - let (res, _) = inspect(&mut db, env, &mut inspector)?; - results.push(f(tx_info, inspector, res.result)?); + let mut inspector = TracingInspector::new(config); + let (res, _) = inspect(&mut db, env, &mut inspector)?; + results.push(f(tx_info, inspector, res.result)?); - // need to apply the state changes of this transaction before executing the next - // transaction - if transactions.peek().is_some() { // need to apply the state changes of this transaction before executing the // next transaction - db.commit(res.state) + if transactions.peek().is_some() { + // need to apply the state changes of this transaction before executing + // the next transaction + db.commit(res.state) + } } - } - Ok(results) - }) - .map(Some) + Ok(results) + }) + .map(Some) + }) + .await } /// Returns traces created at given block. @@ -300,20 +342,25 @@ where Ok(traces) } - /// Replays all transaction in a block + /// Replays all transactions in a block pub async fn replay_block_transactions( &self, block_id: BlockId, trace_types: HashSet, ) -> EthResult>> { - self.trace_block_with(block_id, tracing_config(&trace_types), |tx_info, inspector, res| { - let full_trace = inspector.into_parity_builder().into_trace_results(res, &trace_types); - let trace = TraceResultsWithTransactionHash { - transaction_hash: tx_info.hash.expect("tx hash is set"), - full_trace, - }; - Ok(trace) - }) + self.trace_block_with( + block_id, + tracing_config(&trace_types), + move |tx_info, inspector, res| { + let full_trace = + inspector.into_parity_builder().into_trace_results(res, &trace_types); + let trace = TraceResultsWithTransactionHash { + transaction_hash: tx_info.hash.expect("tx hash is set"), + full_trace, + }; + Ok(trace) + }, + ) .await } } @@ -333,6 +380,7 @@ where trace_types: HashSet, block_id: Option, ) -> Result { + let _permit = self.acquire_trace_permit().await; Ok(TraceApi::trace_call(self, call, trace_types, block_id).await?) } @@ -342,6 +390,7 @@ where calls: Vec<(CallRequest, HashSet)>, block_id: Option, ) -> Result> { + let _permit = self.acquire_trace_permit().await; Ok(TraceApi::trace_call_many(self, calls, block_id).await?) } @@ -352,6 +401,7 @@ where trace_types: HashSet, block_id: Option, ) -> Result { + let _permit = self.acquire_trace_permit().await; Ok(TraceApi::trace_raw_transaction(self, data, trace_types, block_id).await?) } @@ -361,6 +411,7 @@ where block_id: BlockId, trace_types: HashSet, ) -> Result>> { + let _permit = self.acquire_trace_permit().await; Ok(TraceApi::replay_block_transactions(self, block_id, trace_types).await?) } @@ -370,6 +421,7 @@ where transaction: H256, trace_types: HashSet, ) -> Result { + let _permit = self.acquire_trace_permit().await; Ok(TraceApi::replay_transaction(self, transaction, trace_types).await?) } @@ -378,6 +430,7 @@ where &self, block_id: BlockId, ) -> Result>> { + let _permit = self.acquire_trace_permit().await; Ok(TraceApi::trace_block(self, block_id).await?) } @@ -393,6 +446,7 @@ where hash: H256, indices: Vec, ) -> Result> { + let _permit = self.acquire_trace_permit().await; Ok(TraceApi::trace_get(self, hash, indices.into_iter().map(Into::into).collect()).await?) } @@ -401,6 +455,7 @@ where &self, hash: H256, ) -> Result>> { + let _permit = self.acquire_trace_permit().await; Ok(TraceApi::trace_transaction(self, hash).await?) } } @@ -410,6 +465,25 @@ impl std::fmt::Debug for TraceApi { f.debug_struct("TraceApi").finish_non_exhaustive() } } +impl Clone for TraceApi { + fn clone(&self) -> Self { + Self { inner: Arc::clone(&self.inner) } + } +} + +struct TraceApiInner { + /// The client that can interact with the chain. + client: Client, + /// Access to commonly used code of the `eth` namespace + eth_api: Eth, + /// The async cache frontend for eth-related data + #[allow(unused)] // we need this for trace_filter eventually + eth_cache: EthStateCache, + /// The type that can spawn tasks which would otherwise be blocking. + task_spawner: Box, + // restrict the number of concurrent calls to `trace_*` + tracing_call_guard: TracingCallGuard, +} /// Returns the [TracingInspectorConfig] depending on the enabled [TraceType]s fn tracing_config(trace_types: &HashSet) -> TracingInspectorConfig {