perf: spawn blocking trace calls (#2685)

This commit is contained in:
Matthias Seitz
2023-05-15 22:37:29 +02:00
committed by GitHub
parent 2fa9cd066e
commit bf9a3c5338
3 changed files with 307 additions and 145 deletions

View File

@@ -29,22 +29,14 @@ use reth_rpc_types::{
use reth_tasks::TaskSpawner;
use revm::primitives::Env;
use revm_primitives::{db::DatabaseCommit, BlockEnv, CfgEnv};
use tokio::sync::{AcquireError, OwnedSemaphorePermit};
use std::{future::Future, sync::Arc};
use tokio::sync::{oneshot, AcquireError, OwnedSemaphorePermit};
/// `debug` API implementation.
///
/// This type provides the functionality for handling `debug` related requests.
#[non_exhaustive]
pub struct DebugApi<Client, Eth> {
/// The client that can interact with the chain.
client: Client,
/// The implementation of `eth` API
eth_api: Eth,
// restrict the number of concurrent calls to tracing calls
tracing_call_guard: TracingCallGuard,
/// The type that can spawn tasks which would otherwise block.
#[allow(unused)]
task_spawner: Box<dyn TaskSpawner>,
inner: Arc<DebugApiInner<Client, Eth>>,
}
// === impl DebugApi ===
@@ -57,7 +49,9 @@ impl<Client, Eth> DebugApi<Client, Eth> {
task_spawner: Box<dyn TaskSpawner>,
tracing_call_guard: TracingCallGuard,
) -> Self {
Self { client, eth_api: eth, task_spawner, tracing_call_guard }
let inner =
Arc::new(DebugApiInner { client, eth_api: eth, task_spawner, tracing_call_guard });
Self { inner }
}
}
@@ -68,13 +62,30 @@ where
Client: BlockProviderIdExt + HeaderProvider + 'static,
Eth: EthTransactions + 'static,
{
/// Executes the future on a new blocking task.
async fn on_blocking_task<C, F, R>(&self, c: C) -> EthResult<R>
where
C: FnOnce(Self) -> F,
F: Future<Output = EthResult<R>> + Send + 'static,
R: Send + 'static,
{
let (tx, rx) = oneshot::channel();
let this = self.clone();
let f = c(this);
self.inner.task_spawner.spawn_blocking(Box::pin(async move {
let res = f.await;
let _ = tx.send(res);
}));
rx.await.map_err(|_| EthApiError::InternalTracingError)?
}
/// Acquires a permit to execute a tracing call.
async fn acquire_trace_permit(&self) -> Result<OwnedSemaphorePermit, AcquireError> {
self.tracing_call_guard.clone().acquire_owned().await
self.inner.tracing_call_guard.clone().acquire_owned().await
}
/// Trace the entire block
fn trace_block_with(
fn trace_block_with_sync(
&self,
at: BlockId,
transactions: Vec<TransactionSigned>,
@@ -83,7 +94,7 @@ where
opts: GethDebugTracingOptions,
) -> EthResult<Vec<TraceResult>> {
// replay all transactions of the block
self.eth_api.with_state_at_block(at, move |state| {
self.inner.eth_api.with_state_at_block(at, move |state| {
let mut results = Vec::with_capacity(transactions.len());
let mut db = SubState::new(State::new(state));
@@ -106,6 +117,21 @@ where
})
}
/// Trace the entire block asynchronously
async fn trace_block_with(
&self,
at: BlockId,
transactions: Vec<TransactionSigned>,
cfg: CfgEnv,
block_env: BlockEnv,
opts: GethDebugTracingOptions,
) -> EthResult<Vec<TraceResult>> {
self.on_blocking_task(|this| async move {
this.trace_block_with_sync(at, transactions, cfg, block_env, opts)
})
.await
}
/// Replays the given block and returns the trace of each transaction.
///
/// This expects a rlp encoded block
@@ -119,11 +145,11 @@ where
let block =
Block::decode(&mut rlp_block.as_ref()).map_err(BlockError::RlpDecodeRawBlock)?;
let (cfg, block_env) = self.eth_api.evm_env_for_raw_block(&block.header).await?;
let (cfg, block_env) = self.inner.eth_api.evm_env_for_raw_block(&block.header).await?;
// we trace on top the block's parent block
let parent = block.parent_hash;
self.trace_block_with(parent.into(), block.body, cfg, block_env, opts)
self.trace_block_with(parent.into(), block.body, cfg, block_env, opts).await
}
/// Replays a block and returns the trace of each transaction.
@@ -131,15 +157,27 @@ where
&self,
block_id: BlockId,
opts: GethDebugTracingOptions,
) -> EthResult<Vec<TraceResult>> {
self.on_blocking_task(
|this| async move { this.try_debug_trace_block(block_id, opts).await },
)
.await
}
async fn try_debug_trace_block(
&self,
block_id: BlockId,
opts: GethDebugTracingOptions,
) -> EthResult<Vec<TraceResult>> {
let block_hash = self
.inner
.client
.block_hash_for_id(block_id)?
.ok_or_else(|| EthApiError::UnknownBlockNumber)?;
let ((cfg, block_env, _), block) = futures::try_join!(
self.eth_api.evm_env_at(block_hash.into()),
self.eth_api.block_by_id(block_id),
self.inner.eth_api.evm_env_at(block_hash.into()),
self.inner.eth_api.block_by_id(block_id),
)?;
let block = block.ok_or_else(|| EthApiError::UnknownBlockNumber)?;
@@ -147,7 +185,7 @@ where
// its parent block's state
let state_at = block.parent_hash;
self.trace_block_with(state_at.into(), block.body, cfg, block_env, opts)
self.trace_block_with_sync(state_at.into(), block.body, cfg, block_env, opts)
}
/// Trace the transaction according to the provided options.
@@ -158,28 +196,37 @@ where
tx_hash: H256,
opts: GethDebugTracingOptions,
) -> EthResult<GethTraceFrame> {
let (transaction, block) = match self.eth_api.transaction_and_block(tx_hash).await? {
let (transaction, block) = match self.inner.eth_api.transaction_and_block(tx_hash).await? {
None => return Err(EthApiError::TransactionNotFound),
Some(res) => res,
};
let (cfg, block_env, _) = self.eth_api.evm_env_at(block.hash.into()).await?;
let (cfg, block_env, _) = self.inner.eth_api.evm_env_at(block.hash.into()).await?;
// we need to get the state of the parent block because we're essentially replaying the
// block the transaction is included in
let state_at = block.parent_hash;
let block_txs = block.body;
self.eth_api.with_state_at_block(state_at.into(), |state| {
// configure env for the target transaction
let tx = transaction.into_recovered();
self.on_blocking_task(|this| async move {
this.inner.eth_api.with_state_at_block(state_at.into(), |state| {
// configure env for the target transaction
let tx = transaction.into_recovered();
let mut db = SubState::new(State::new(state));
// replay all transactions prior to the targeted transaction
replay_transactions_until(&mut db, cfg.clone(), block_env.clone(), block_txs, tx.hash)?;
let mut db = SubState::new(State::new(state));
// replay all transactions prior to the targeted transaction
replay_transactions_until(
&mut db,
cfg.clone(),
block_env.clone(),
block_txs,
tx.hash,
)?;
let env = Env { cfg, block: block_env, tx: tx_env_with_recovered(&tx) };
trace_transaction(opts, env, &mut db).map(|(trace, _)| trace)
let env = Env { cfg, block: block_env, tx: tx_env_with_recovered(&tx) };
trace_transaction(opts, env, &mut db).map(|(trace, _)| trace)
})
})
.await
}
/// The debug_traceCall method lets you run an `eth_call` within the context of the given block
@@ -189,6 +236,22 @@ where
call: CallRequest,
block_id: Option<BlockId>,
opts: GethDebugTracingCallOptions,
) -> EthResult<GethTraceFrame> {
self.on_blocking_task(|this| async move {
this.try_debug_trace_call(call, block_id, opts).await
})
.await
}
/// The debug_traceCall method lets you run an `eth_call` within the context of the given block
/// execution using the final state of parent block as the base.
///
/// Caution: while this is async, this may still be blocking on necessary DB io.
async fn try_debug_trace_call(
&self,
call: CallRequest,
block_id: Option<BlockId>,
opts: GethDebugTracingCallOptions,
) -> EthResult<GethTraceFrame> {
let at = block_id.unwrap_or(BlockId::Number(BlockNumberOrTag::Latest));
// TODO(mattsse) apply block overrides
@@ -209,6 +272,7 @@ where
GethDebugBuiltInTracerType::FourByteTracer => {
let mut inspector = FourByteInspector::default();
let (_res, _) = self
.inner
.eth_api
.inspect_call_at(call, at, state_overrides, &mut inspector)
.await?;
@@ -224,6 +288,7 @@ where
);
let _ = self
.inner
.eth_api
.inspect_call_at(call, at, state_overrides, &mut inspector)
.await?;
@@ -249,7 +314,7 @@ where
let mut inspector = TracingInspector::new(inspector_config);
let (res, _) =
self.eth_api.inspect_call_at(call, at, state_overrides, &mut inspector).await?;
self.inner.eth_api.inspect_call_at(call, at, state_overrides, &mut inspector).await?;
let gas_used = res.result.gas_used();
let frame = inspector.into_geth_builder().geth_traces(U256::from(gas_used), config);
@@ -267,13 +332,15 @@ where
/// Handler for `debug_getRawHeader`
async fn raw_header(&self, block_id: BlockId) -> RpcResult<Bytes> {
let header = match block_id {
BlockId::Hash(hash) => self.client.header(&hash.into()).to_rpc_result()?,
BlockId::Hash(hash) => self.inner.client.header(&hash.into()).to_rpc_result()?,
BlockId::Number(number_or_tag) => {
let number =
self.client.convert_block_number(number_or_tag).to_rpc_result()?.ok_or_else(
|| internal_rpc_err("Pending block not supported".to_string()),
)?;
self.client.header_by_number(number).to_rpc_result()?
let number = self
.inner
.client
.convert_block_number(number_or_tag)
.to_rpc_result()?
.ok_or_else(|| internal_rpc_err("Pending block not supported".to_string()))?;
self.inner.client.header_by_number(number).to_rpc_result()?
}
};
@@ -287,7 +354,7 @@ where
/// Handler for `debug_getRawBlock`
async fn raw_block(&self, block_id: BlockId) -> RpcResult<Bytes> {
let block = self.client.block_by_id(block_id).to_rpc_result()?;
let block = self.inner.client.block_by_id(block_id).to_rpc_result()?;
let mut res = Vec::new();
if let Some(mut block) = block {
@@ -304,7 +371,7 @@ where
/// Handler for `debug_getRawTransaction`
/// Returns the bytes of the transaction for the given hash.
async fn raw_transaction(&self, hash: H256) -> RpcResult<Bytes> {
let tx = self.eth_api.transaction_by_hash(hash).await?;
let tx = self.inner.eth_api.transaction_by_hash(hash).await?;
let mut res = Vec::new();
if let Some(tx) = tx.map(TransactionSource::into_recovered) {
@@ -317,7 +384,7 @@ where
/// Handler for `debug_getRawReceipts`
async fn raw_receipts(&self, block_id: BlockId) -> RpcResult<Vec<Bytes>> {
let receipts =
self.client.receipts_by_block_id(block_id).to_rpc_result()?.unwrap_or_default();
self.inner.client.receipts_by_block_id(block_id).to_rpc_result()?.unwrap_or_default();
let mut all_receipts = Vec::with_capacity(receipts.len());
for receipt in receipts {
@@ -402,6 +469,23 @@ impl<Client, Eth> std::fmt::Debug for DebugApi<Client, Eth> {
}
}
impl<Client, Eth> Clone for DebugApi<Client, Eth> {
fn clone(&self) -> Self {
Self { inner: Arc::clone(&self.inner) }
}
}
struct DebugApiInner<Client, Eth> {
/// The client that can interact with the chain.
client: Client,
/// The implementation of `eth` API
eth_api: Eth,
// restrict the number of concurrent calls to tracing calls
tracing_call_guard: TracingCallGuard,
/// The type that can spawn tasks which would otherwise block.
task_spawner: Box<dyn TaskSpawner>,
}
/// Executes the configured transaction with the environment on the given database.
///
/// Returns the trace frame and the state that got updated after executing the transaction.

View File

@@ -63,6 +63,9 @@ pub enum EthApiError {
/// Percentile array is invalid
#[error("invalid reward percentile")]
InvalidRewardPercentile(f64),
/// Error thrown when a spawned tracing task failed to deliver an anticipated response.
#[error("Internal error while tracing")]
InternalTracingError,
}
impl From<EthApiError> for ErrorObject<'static> {
@@ -87,6 +90,7 @@ impl From<EthApiError> for ErrorObject<'static> {
}
EthApiError::Unsupported(msg) => internal_rpc_err(msg),
EthApiError::InvalidRewardPercentile(msg) => internal_rpc_err(msg.to_string()),
err @ EthApiError::InternalTracingError => internal_rpc_err(err.to_string()),
}
}
}

View File

@@ -26,26 +26,14 @@ use reth_rpc_types::{
use reth_tasks::TaskSpawner;
use revm::primitives::Env;
use revm_primitives::{db::DatabaseCommit, ExecutionResult};
use std::collections::HashSet;
use tokio::sync::{AcquireError, OwnedSemaphorePermit};
use std::{collections::HashSet, future::Future, sync::Arc};
use tokio::sync::{oneshot, AcquireError, OwnedSemaphorePermit};
/// `trace` API implementation.
///
/// This type provides the functionality for handling `trace` related requests.
#[derive(Clone)]
pub struct TraceApi<Client, Eth> {
/// The client that can interact with the chain.
client: Client,
/// Access to commonly used code of the `eth` namespace
eth_api: Eth,
/// The async cache frontend for eth-related data
#[allow(unused)] // we need this for trace_filter eventually
eth_cache: EthStateCache,
/// The type that can spawn tasks which would otherwise be blocking.
#[allow(unused)]
task_spawner: Box<dyn TaskSpawner>,
// restrict the number of concurrent calls to `trace_*`
tracing_call_guard: TracingCallGuard,
inner: Arc<TraceApiInner<Client, Eth>>,
}
// === impl TraceApi ===
@@ -53,7 +41,7 @@ pub struct TraceApi<Client, Eth> {
impl<Client, Eth> TraceApi<Client, Eth> {
/// The client that can interact with the chain.
pub fn client(&self) -> &Client {
&self.client
&self.inner.client
}
/// Create a new instance of the [TraceApi]
@@ -64,14 +52,21 @@ impl<Client, Eth> TraceApi<Client, Eth> {
task_spawner: Box<dyn TaskSpawner>,
tracing_call_guard: TracingCallGuard,
) -> Self {
Self { client, eth_api, eth_cache, task_spawner, tracing_call_guard }
let inner = Arc::new(TraceApiInner {
client,
eth_api,
eth_cache,
task_spawner,
tracing_call_guard,
});
Self { inner }
}
/// Acquires a permit to execute a tracing call.
async fn acquire_trace_permit(
&self,
) -> std::result::Result<OwnedSemaphorePermit, AcquireError> {
self.tracing_call_guard.clone().acquire_owned().await
self.inner.tracing_call_guard.clone().acquire_owned().await
}
}
@@ -82,6 +77,23 @@ where
Client: BlockProvider + StateProviderFactory + EvmEnvProvider + 'static,
Eth: EthTransactions + 'static,
{
/// Executes the future on a new blocking task.
async fn on_blocking_task<C, F, R>(&self, c: C) -> EthResult<R>
where
C: FnOnce(Self) -> F,
F: Future<Output = EthResult<R>> + Send + 'static,
R: Send + 'static,
{
let (tx, rx) = oneshot::channel();
let this = self.clone();
let f = c(this);
self.inner.task_spawner.spawn_blocking(Box::pin(async move {
let res = f.await;
let _ = tx.send(res);
}));
rx.await.map_err(|_| EthApiError::InternalTracingError)?
}
/// Executes the given call and returns a number of possible traces for it.
pub async fn trace_call(
&self,
@@ -89,12 +101,23 @@ where
trace_types: HashSet<TraceType>,
block_id: Option<BlockId>,
) -> EthResult<TraceResults> {
let _permit = self.acquire_trace_permit().await;
self.on_blocking_task(|this| async move {
this.try_trace_call(call, trace_types, block_id).await
})
.await
}
async fn try_trace_call(
&self,
call: CallRequest,
trace_types: HashSet<TraceType>,
block_id: Option<BlockId>,
) -> EthResult<TraceResults> {
let at = block_id.unwrap_or(BlockId::Number(BlockNumberOrTag::Latest));
let config = tracing_config(&trace_types);
let mut inspector = TracingInspector::new(config);
let (res, _) = self.eth_api.inspect_call_at(call, at, None, &mut inspector).await?;
let (res, _) = self.inner.eth_api.inspect_call_at(call, at, None, &mut inspector).await?;
let trace_res =
inspector.into_parity_builder().into_trace_results(res.result, &trace_types);
@@ -108,10 +131,10 @@ where
trace_types: HashSet<TraceType>,
block_id: Option<BlockId>,
) -> EthResult<TraceResults> {
let _permit = self.acquire_trace_permit().await;
let tx = recover_raw_transaction(tx)?;
let (cfg, block, at) = self
.inner
.eth_api
.evm_env_at(block_id.unwrap_or(BlockId::Number(BlockNumberOrTag::Latest)))
.await?;
@@ -120,42 +143,50 @@ where
let config = tracing_config(&trace_types);
self.eth_api.trace_at(env, config, at, |inspector, res| {
let trace_res =
inspector.into_parity_builder().into_trace_results(res.result, &trace_types);
Ok(trace_res)
self.on_blocking_task(|this| async move {
this.inner.eth_api.trace_at(env, config, at, |inspector, res| {
let trace_res =
inspector.into_parity_builder().into_trace_results(res.result, &trace_types);
Ok(trace_res)
})
})
.await
}
/// Performs multiple call traces on top of the same block. i.e. transaction n will be executed
/// on top of a pending block with all n-1 transactions applied (traced) first.
///
/// Note: Allows to trace dependent transactions, hence all transactions are traced in sequence
/// Note: Allows tracing dependent transactions, hence all transactions are traced in sequence
pub async fn trace_call_many(
&self,
calls: Vec<(CallRequest, HashSet<TraceType>)>,
block_id: Option<BlockId>,
) -> EthResult<Vec<TraceResults>> {
let at = block_id.unwrap_or(BlockId::Number(BlockNumberOrTag::Pending));
let (cfg, block_env, at) = self.eth_api.evm_env_at(at).await?;
let (cfg, block_env, at) = self.inner.eth_api.evm_env_at(at).await?;
// execute all transactions on top of each other and record the traces
self.eth_api.with_state_at_block(at, move |state| {
let mut results = Vec::with_capacity(calls.len());
let mut db = SubState::new(State::new(state));
self.on_blocking_task(|this| async move {
// execute all transactions on top of each other and record the traces
this.inner.eth_api.with_state_at_block(at, move |state| {
let mut results = Vec::with_capacity(calls.len());
let mut db = SubState::new(State::new(state));
for (call, trace_types) in calls {
let env = prepare_call_env(cfg.clone(), block_env.clone(), call, &mut db, None)?;
let config = tracing_config(&trace_types);
let mut inspector = TracingInspector::new(config);
let (res, _) = inspect(&mut db, env, &mut inspector)?;
let trace_res =
inspector.into_parity_builder().into_trace_results(res.result, &trace_types);
results.push(trace_res);
}
for (call, trace_types) in calls {
let env =
prepare_call_env(cfg.clone(), block_env.clone(), call, &mut db, None)?;
let config = tracing_config(&trace_types);
let mut inspector = TracingInspector::new(config);
let (res, _) = inspect(&mut db, env, &mut inspector)?;
let trace_res = inspector
.into_parity_builder()
.into_trace_results(res.result, &trace_types);
results.push(trace_res);
}
Ok(results)
Ok(results)
})
})
.await
}
/// Replays a transaction, returning the traces.
@@ -165,15 +196,20 @@ where
trace_types: HashSet<TraceType>,
) -> EthResult<TraceResults> {
let config = tracing_config(&trace_types);
self.eth_api
.trace_transaction_in_block(hash, config, |_, inspector, res| {
let trace_res =
inspector.into_parity_builder().into_trace_results(res.result, &trace_types);
Ok(trace_res)
})
.await
.transpose()
.ok_or_else(|| EthApiError::TransactionNotFound)?
self.on_blocking_task(|this| async move {
this.inner
.eth_api
.trace_transaction_in_block(hash, config, |_, inspector, res| {
let trace_res = inspector
.into_parity_builder()
.into_trace_results(res.result, &trace_types);
Ok(trace_res)
})
.await
.transpose()
.ok_or_else(|| EthApiError::TransactionNotFound)?
})
.await
}
/// Returns transaction trace with the given address.
@@ -182,8 +218,6 @@ where
hash: H256,
trace_address: Vec<usize>,
) -> EthResult<Option<LocalizedTransactionTrace>> {
let _permit = self.acquire_trace_permit().await;
match self.trace_transaction(hash).await? {
None => Ok(None),
Some(traces) => {
@@ -199,19 +233,22 @@ where
&self,
hash: H256,
) -> EthResult<Option<Vec<LocalizedTransactionTrace>>> {
let _permit = self.acquire_trace_permit().await;
self.eth_api
.trace_transaction_in_block(
hash,
TracingInspectorConfig::default_parity(),
|tx_info, inspector, _| {
let traces =
inspector.into_parity_builder().into_localized_transaction_traces(tx_info);
Ok(traces)
},
)
.await
self.on_blocking_task(|this| async move {
this.inner
.eth_api
.trace_transaction_in_block(
hash,
TracingInspectorConfig::default_parity(),
|tx_info, inspector, _| {
let traces = inspector
.into_parity_builder()
.into_localized_transaction_traces(tx_info);
Ok(traces)
},
)
.await
})
.await
}
/// Executes all transactions of a block and returns a list of callback results.
@@ -222,11 +259,12 @@ where
f: F,
) -> EthResult<Option<Vec<R>>>
where
F: Fn(TransactionInfo, TracingInspector, ExecutionResult) -> EthResult<R> + Send,
F: Fn(TransactionInfo, TracingInspector, ExecutionResult) -> EthResult<R> + Send + 'static,
R: Send + 'static,
{
let ((cfg, block_env, _), block) = futures::try_join!(
self.eth_api.evm_env_at(block_id),
self.eth_api.block_by_id(block_id),
self.inner.eth_api.evm_env_at(block_id),
self.inner.eth_api.block_by_id(block_id),
)?;
let block = match block {
@@ -241,43 +279,47 @@ where
let block_hash = block.hash;
let transactions = block.body;
// replay all transactions of the block
self.eth_api
.with_state_at_block(state_at.into(), move |state| {
let mut results = Vec::with_capacity(transactions.len());
let mut db = SubState::new(State::new(state));
self.on_blocking_task(|this| async move {
// replay all transactions of the block
this.inner
.eth_api
.with_state_at_block(state_at.into(), move |state| {
let mut results = Vec::with_capacity(transactions.len());
let mut db = SubState::new(State::new(state));
let mut transactions = transactions.into_iter().enumerate().peekable();
let mut transactions = transactions.into_iter().enumerate().peekable();
while let Some((idx, tx)) = transactions.next() {
let tx = tx.into_ecrecovered().ok_or(BlockError::InvalidSignature)?;
let tx_info = TransactionInfo {
hash: Some(tx.hash()),
index: Some(idx as u64),
block_hash: Some(block_hash),
block_number: Some(block_env.number.try_into().unwrap_or(u64::MAX)),
base_fee: Some(block_env.basefee.try_into().unwrap_or(u64::MAX)),
};
while let Some((idx, tx)) = transactions.next() {
let tx = tx.into_ecrecovered().ok_or(BlockError::InvalidSignature)?;
let tx_info = TransactionInfo {
hash: Some(tx.hash()),
index: Some(idx as u64),
block_hash: Some(block_hash),
block_number: Some(block_env.number.try_into().unwrap_or(u64::MAX)),
base_fee: Some(block_env.basefee.try_into().unwrap_or(u64::MAX)),
};
let tx = tx_env_with_recovered(&tx);
let env = Env { cfg: cfg.clone(), block: block_env.clone(), tx };
let tx = tx_env_with_recovered(&tx);
let env = Env { cfg: cfg.clone(), block: block_env.clone(), tx };
let mut inspector = TracingInspector::new(config);
let (res, _) = inspect(&mut db, env, &mut inspector)?;
results.push(f(tx_info, inspector, res.result)?);
let mut inspector = TracingInspector::new(config);
let (res, _) = inspect(&mut db, env, &mut inspector)?;
results.push(f(tx_info, inspector, res.result)?);
// need to apply the state changes of this transaction before executing the next
// transaction
if transactions.peek().is_some() {
// need to apply the state changes of this transaction before executing the
// next transaction
db.commit(res.state)
if transactions.peek().is_some() {
// need to apply the state changes of this transaction before executing
// the next transaction
db.commit(res.state)
}
}
}
Ok(results)
})
.map(Some)
Ok(results)
})
.map(Some)
})
.await
}
/// Returns traces created at given block.
@@ -300,20 +342,25 @@ where
Ok(traces)
}
/// Replays all transaction in a block
/// Replays all transactions in a block
pub async fn replay_block_transactions(
&self,
block_id: BlockId,
trace_types: HashSet<TraceType>,
) -> EthResult<Option<Vec<TraceResultsWithTransactionHash>>> {
self.trace_block_with(block_id, tracing_config(&trace_types), |tx_info, inspector, res| {
let full_trace = inspector.into_parity_builder().into_trace_results(res, &trace_types);
let trace = TraceResultsWithTransactionHash {
transaction_hash: tx_info.hash.expect("tx hash is set"),
full_trace,
};
Ok(trace)
})
self.trace_block_with(
block_id,
tracing_config(&trace_types),
move |tx_info, inspector, res| {
let full_trace =
inspector.into_parity_builder().into_trace_results(res, &trace_types);
let trace = TraceResultsWithTransactionHash {
transaction_hash: tx_info.hash.expect("tx hash is set"),
full_trace,
};
Ok(trace)
},
)
.await
}
}
@@ -333,6 +380,7 @@ where
trace_types: HashSet<TraceType>,
block_id: Option<BlockId>,
) -> Result<TraceResults> {
let _permit = self.acquire_trace_permit().await;
Ok(TraceApi::trace_call(self, call, trace_types, block_id).await?)
}
@@ -342,6 +390,7 @@ where
calls: Vec<(CallRequest, HashSet<TraceType>)>,
block_id: Option<BlockId>,
) -> Result<Vec<TraceResults>> {
let _permit = self.acquire_trace_permit().await;
Ok(TraceApi::trace_call_many(self, calls, block_id).await?)
}
@@ -352,6 +401,7 @@ where
trace_types: HashSet<TraceType>,
block_id: Option<BlockId>,
) -> Result<TraceResults> {
let _permit = self.acquire_trace_permit().await;
Ok(TraceApi::trace_raw_transaction(self, data, trace_types, block_id).await?)
}
@@ -361,6 +411,7 @@ where
block_id: BlockId,
trace_types: HashSet<TraceType>,
) -> Result<Option<Vec<TraceResultsWithTransactionHash>>> {
let _permit = self.acquire_trace_permit().await;
Ok(TraceApi::replay_block_transactions(self, block_id, trace_types).await?)
}
@@ -370,6 +421,7 @@ where
transaction: H256,
trace_types: HashSet<TraceType>,
) -> Result<TraceResults> {
let _permit = self.acquire_trace_permit().await;
Ok(TraceApi::replay_transaction(self, transaction, trace_types).await?)
}
@@ -378,6 +430,7 @@ where
&self,
block_id: BlockId,
) -> Result<Option<Vec<LocalizedTransactionTrace>>> {
let _permit = self.acquire_trace_permit().await;
Ok(TraceApi::trace_block(self, block_id).await?)
}
@@ -393,6 +446,7 @@ where
hash: H256,
indices: Vec<Index>,
) -> Result<Option<LocalizedTransactionTrace>> {
let _permit = self.acquire_trace_permit().await;
Ok(TraceApi::trace_get(self, hash, indices.into_iter().map(Into::into).collect()).await?)
}
@@ -401,6 +455,7 @@ where
&self,
hash: H256,
) -> Result<Option<Vec<LocalizedTransactionTrace>>> {
let _permit = self.acquire_trace_permit().await;
Ok(TraceApi::trace_transaction(self, hash).await?)
}
}
@@ -410,6 +465,25 @@ impl<Client, Eth> std::fmt::Debug for TraceApi<Client, Eth> {
f.debug_struct("TraceApi").finish_non_exhaustive()
}
}
impl<Client, Eth> Clone for TraceApi<Client, Eth> {
fn clone(&self) -> Self {
Self { inner: Arc::clone(&self.inner) }
}
}
struct TraceApiInner<Client, Eth> {
/// The client that can interact with the chain.
client: Client,
/// Access to commonly used code of the `eth` namespace
eth_api: Eth,
/// The async cache frontend for eth-related data
#[allow(unused)] // we need this for trace_filter eventually
eth_cache: EthStateCache,
/// The type that can spawn tasks which would otherwise be blocking.
task_spawner: Box<dyn TaskSpawner>,
// restrict the number of concurrent calls to `trace_*`
tracing_call_guard: TracingCallGuard,
}
/// Returns the [TracingInspectorConfig] depending on the enabled [TraceType]s
fn tracing_config(trace_types: &HashSet<TraceType>) -> TracingInspectorConfig {