feat: use tracing pool for tracing calls (#3914)

This commit is contained in:
Matthias Seitz
2023-07-26 18:33:59 +02:00
committed by GitHub
parent 8cdb097829
commit 49e112789b
15 changed files with 308 additions and 361 deletions

3
Cargo.lock generated
View File

@@ -5682,6 +5682,7 @@ dependencies = [
"jsonwebtoken",
"pin-project",
"rand 0.8.5",
"rayon",
"reth-consensus-common",
"reth-interfaces",
"reth-metrics",
@@ -5741,8 +5742,6 @@ version = "0.1.0-alpha.4"
dependencies = [
"hyper",
"jsonrpsee",
"pin-project",
"rayon",
"reth-beacon-consensus",
"reth-interfaces",
"reth-ipc",

View File

@@ -34,9 +34,6 @@ strum = { workspace = true, features = ["derive"] }
serde = { workspace = true, features = ["derive"] }
thiserror.workspace = true
tracing.workspace = true
rayon.workspace = true
pin-project.workspace = true
tokio = { workspace = true, features = ["sync"] }
[dev-dependencies]
reth-tracing = { path = "../../tracing" }

View File

@@ -17,7 +17,7 @@ use reth_provider::{
use reth_rpc::{
eth::{cache::EthStateCache, gas_oracle::GasPriceOracle},
AuthLayer, Claims, EngineEthApi, EthApi, EthFilter, EthSubscriptionIdProvider,
JwtAuthValidator, JwtSecret,
JwtAuthValidator, JwtSecret, TracingCallPool,
};
use reth_rpc_api::{servers::*, EngineApiServer};
use reth_tasks::TaskSpawner;
@@ -64,6 +64,7 @@ where
gas_oracle,
EthConfig::default().rpc_gas_cap,
Box::new(executor.clone()),
TracingCallPool::build().expect("failed to build tracing pool"),
);
let eth_filter = EthFilter::new(
provider,

View File

@@ -4,7 +4,7 @@ use reth_rpc::{
gas_oracle::GasPriceOracleConfig,
RPC_DEFAULT_GAS_CAP,
},
EthApi, EthFilter, EthPubSub,
EthApi, EthFilter, EthPubSub, TracingCallPool,
};
use serde::{Deserialize, Serialize};
@@ -25,6 +25,8 @@ pub struct EthHandlers<Provider, Pool, Network, Events> {
pub filter: EthFilter<Provider, Pool>,
/// Handler for subscriptions only available for transports that support it (ws, ipc)
pub pubsub: EthPubSub<Provider, Pool, Events, Network>,
/// The configured tracing call pool
pub tracing_call_pool: TracingCallPool,
}
/// Additional config values for the eth namespace

View File

@@ -122,7 +122,8 @@ use reth_rpc::{
gas_oracle::GasPriceOracle,
},
AdminApi, DebugApi, EngineEthApi, EthApi, EthFilter, EthPubSub, EthSubscriptionIdProvider,
NetApi, OtterscanApi, RPCApi, RethApi, TraceApi, TracingCallGuard, TxPoolApi, Web3Api,
NetApi, OtterscanApi, RPCApi, RethApi, TraceApi, TracingCallGuard, TracingCallPool, TxPoolApi,
Web3Api,
};
use reth_rpc_api::{servers::*, EngineApiServer};
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
@@ -154,9 +155,6 @@ mod eth;
/// Common RPC constants.
pub mod constants;
/// Additional support for tracing related rpc calls
pub mod tracing_pool;
// Rpc server metrics
mod metrics;
@@ -816,15 +814,9 @@ where
let eth = self.eth_handlers();
self.modules.insert(
RethRpcModule::Trace,
TraceApi::new(
self.provider.clone(),
eth.api.clone(),
eth.cache,
Box::new(self.executor.clone()),
self.tracing_call_guard.clone(),
)
.into_rpc()
.into(),
TraceApi::new(self.provider.clone(), eth.api.clone(), self.tracing_call_guard.clone())
.into_rpc()
.into(),
);
self
}
@@ -895,8 +887,13 @@ where
&mut self,
namespaces: impl Iterator<Item = RethRpcModule>,
) -> Vec<Methods> {
let EthHandlers { api: eth_api, cache: eth_cache, filter: eth_filter, pubsub: eth_pubsub } =
self.with_eth(|eth| eth.clone());
let EthHandlers {
api: eth_api,
filter: eth_filter,
pubsub: eth_pubsub,
cache: _,
tracing_call_pool: _,
} = self.with_eth(|eth| eth.clone());
// Create a copy, so we can list out all the methods for rpc_ api
let namespaces: Vec<_> = namespaces.collect();
@@ -933,8 +930,6 @@ where
RethRpcModule::Trace => TraceApi::new(
self.provider.clone(),
eth_api.clone(),
eth_cache.clone(),
Box::new(self.executor.clone()),
self.tracing_call_guard.clone(),
)
.into_rpc()
@@ -997,6 +992,7 @@ where
);
let executor = Box::new(self.executor.clone());
let tracing_call_pool = TracingCallPool::build().expect("failed to build tracing pool");
let api = EthApi::with_spawner(
self.provider.clone(),
self.pool.clone(),
@@ -1005,6 +1001,7 @@ where
gas_oracle,
self.config.eth.rpc_gas_cap,
executor.clone(),
tracing_call_pool.clone(),
);
let filter = EthFilter::new(
self.provider.clone(),
@@ -1022,7 +1019,7 @@ where
executor,
);
let eth = EthHandlers { api, cache, filter, pubsub };
let eth = EthHandlers { api, cache, filter, pubsub, tracing_call_pool };
self.eth = Some(eth);
}
f(self.eth.as_ref().expect("exists; qed"))

View File

@@ -48,6 +48,7 @@ tower = "0.4"
tokio-stream = { workspace = true, features = ["sync"] }
tokio-util = "0.7"
pin-project.workspace = true
rayon.workspace = true
bytes.workspace = true
secp256k1 = { workspace = true, features = ["global-context", "rand-std", "recovery"] }

View File

@@ -1,26 +0,0 @@
use std::sync::Arc;
use tokio::sync::{AcquireError, OwnedSemaphorePermit, Semaphore};
/// RPC Tracing call guard semaphore.
///
/// This is used to restrict the number of concurrent RPC requests to tracing methods like
/// `debug_traceTransaction` because they can consume a lot of memory and CPU.
#[derive(Clone, Debug)]
pub struct TracingCallGuard(Arc<Semaphore>);
impl TracingCallGuard {
/// Create a new `TracingCallGuard` with the given maximum number of tracing calls in parallel.
pub fn new(max_tracing_requests: u32) -> Self {
Self(Arc::new(Semaphore::new(max_tracing_requests as usize)))
}
/// See also [Semaphore::acquire_owned]
pub async fn acquire_owned(self) -> Result<OwnedSemaphorePermit, AcquireError> {
self.0.acquire_owned().await
}
/// See also [Semaphore::acquire_many_owned]
pub async fn acquire_many_owned(self, n: u32) -> Result<OwnedSemaphorePermit, AcquireError> {
self.0.acquire_many_owned(n).await
}
}

View File

@@ -40,8 +40,8 @@ use revm_primitives::{
db::{DatabaseCommit, DatabaseRef},
BlockEnv, CfgEnv,
};
use std::{future::Future, sync::Arc};
use tokio::sync::{mpsc, oneshot, AcquireError, OwnedSemaphorePermit};
use std::sync::Arc;
use tokio::sync::{mpsc, AcquireError, OwnedSemaphorePermit};
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
/// `debug` API implementation.
@@ -74,63 +74,11 @@ where
Provider: BlockReaderIdExt + HeaderProvider + 'static,
Eth: EthTransactions + 'static,
{
/// Executes the future on a new blocking task.
async fn on_blocking_task<C, F, R>(&self, c: C) -> EthResult<R>
where
C: FnOnce(Self) -> F,
F: Future<Output = EthResult<R>> + Send + 'static,
R: Send + 'static,
{
let (tx, rx) = oneshot::channel();
let this = self.clone();
let f = c(this);
self.inner.task_spawner.spawn_blocking(Box::pin(async move {
let res = f.await;
let _ = tx.send(res);
}));
rx.await.map_err(|_| EthApiError::InternalTracingError)?
}
/// Acquires a permit to execute a tracing call.
async fn acquire_trace_permit(&self) -> Result<OwnedSemaphorePermit, AcquireError> {
self.inner.tracing_call_guard.clone().acquire_owned().await
}
/// Trace the entire block
fn trace_block_with_sync(
&self,
at: BlockId,
transactions: Vec<TransactionSigned>,
cfg: CfgEnv,
block_env: BlockEnv,
opts: GethDebugTracingOptions,
) -> EthResult<Vec<TraceResult>> {
// replay all transactions of the block
let this = self.clone();
self.inner.eth_api.with_state_at_block(at, move |state| {
let mut results = Vec::with_capacity(transactions.len());
let mut db = SubState::new(State::new(state));
let mut transactions = transactions.into_iter().peekable();
while let Some(tx) = transactions.next() {
let tx = tx.into_ecrecovered().ok_or(BlockError::InvalidSignature)?;
let tx = tx_env_with_recovered(&tx);
let env = Env { cfg: cfg.clone(), block: block_env.clone(), tx };
let (result, state_changes) =
this.trace_transaction(opts.clone(), env, at, &mut db)?;
results.push(TraceResult::Success { result });
if transactions.peek().is_some() {
// need to apply the state changes of this transaction before executing the next
// transaction
db.commit(state_changes)
}
}
Ok(results)
})
}
/// Trace the entire block asynchronously
async fn trace_block_with(
&self,
@@ -140,10 +88,33 @@ where
block_env: BlockEnv,
opts: GethDebugTracingOptions,
) -> EthResult<Vec<TraceResult>> {
self.on_blocking_task(|this| async move {
this.trace_block_with_sync(at, transactions, cfg, block_env, opts)
})
.await
// replay all transactions of the block
let this = self.clone();
self.inner
.eth_api
.spawn_with_state_at_block(at, move |state| {
let mut results = Vec::with_capacity(transactions.len());
let mut db = SubState::new(State::new(state));
let mut transactions = transactions.into_iter().peekable();
while let Some(tx) = transactions.next() {
let tx = tx.into_ecrecovered().ok_or(BlockError::InvalidSignature)?;
let tx = tx_env_with_recovered(&tx);
let env = Env { cfg: cfg.clone(), block: block_env.clone(), tx };
let (result, state_changes) =
this.trace_transaction(opts.clone(), env, at, &mut db)?;
results.push(TraceResult::Success { result });
if transactions.peek().is_some() {
// need to apply the state changes of this transaction before executing the
// next transaction
db.commit(state_changes)
}
}
Ok(results)
})
.await
}
/// Replays the given block and returns the trace of each transaction.
@@ -171,17 +142,6 @@ where
&self,
block_id: BlockId,
opts: GethDebugTracingOptions,
) -> EthResult<Vec<TraceResult>> {
self.on_blocking_task(
|this| async move { this.try_debug_trace_block(block_id, opts).await },
)
.await
}
async fn try_debug_trace_block(
&self,
block_id: BlockId,
opts: GethDebugTracingOptions,
) -> EthResult<Vec<TraceResult>> {
let block_hash = self
.inner
@@ -199,7 +159,7 @@ where
// its parent block's state
let state_at = block.parent_hash;
self.trace_block_with_sync(state_at.into(), block.body, cfg, block_env, opts)
self.trace_block_with(state_at.into(), block.body, cfg, block_env, opts).await
}
/// Trace the transaction according to the provided options.
@@ -221,8 +181,10 @@ where
let state_at: BlockId = block.parent_hash.into();
let block_txs = block.body;
self.on_blocking_task(|this| async move {
this.inner.eth_api.with_state_at_block(state_at, |state| {
let this = self.clone();
self.inner
.eth_api
.spawn_with_state_at_block(state_at, move |state| {
// configure env for the target transaction
let tx = transaction.into_recovered();
@@ -239,8 +201,7 @@ where
let env = Env { cfg, block: block_env, tx: tx_env_with_recovered(&tx) };
this.trace_transaction(opts, env, state_at, &mut db).map(|(trace, _)| trace)
})
})
.await
.await
}
/// The debug_traceCall method lets you run an `eth_call` within the context of the given block
@@ -250,22 +211,6 @@ where
call: CallRequest,
block_id: Option<BlockId>,
opts: GethDebugTracingCallOptions,
) -> EthResult<GethTrace> {
self.on_blocking_task(|this| async move {
this.try_debug_trace_call(call, block_id, opts).await
})
.await
}
/// The debug_traceCall method lets you run an `eth_call` within the context of the given block
/// execution using the final state of parent block as the base.
///
/// Caution: while this is async, this may still be blocking on necessary DB io.
async fn try_debug_trace_call(
&self,
call: CallRequest,
block_id: Option<BlockId>,
opts: GethDebugTracingCallOptions,
) -> EthResult<GethTrace> {
let at = block_id.unwrap_or(BlockId::Number(BlockNumberOrTag::Latest));
let GethDebugTracingCallOptions { tracing_options, state_overrides, block_overrides } =
@@ -278,10 +223,13 @@ where
GethDebugTracerType::BuiltInTracer(tracer) => match tracer {
GethDebugBuiltInTracerType::FourByteTracer => {
let mut inspector = FourByteInspector::default();
let (_res, _) = self
let inspector = self
.inner
.eth_api
.inspect_call_at(call, at, overrides, &mut inspector)
.spawn_with_call_at(call, at, overrides, move |db, env| {
inspect(db, env, &mut inspector)?;
Ok(inspector)
})
.await?;
return Ok(FourByteFrame::from(inspector).into())
}
@@ -295,10 +243,13 @@ where
.set_record_logs(call_config.with_log.unwrap_or_default()),
);
let _ = self
let inspector = self
.inner
.eth_api
.inspect_call_at(call, at, overrides, &mut inspector)
.spawn_with_call_at(call, at, overrides, move |db, env| {
inspect(db, env, &mut inspector)?;
Ok(inspector)
})
.await?;
let frame = inspector.into_geth_builder().geth_call_traces(call_config);
@@ -351,8 +302,14 @@ where
let mut inspector = TracingInspector::new(inspector_config);
let (res, _) =
self.inner.eth_api.inspect_call_at(call, at, overrides, &mut inspector).await?;
let (res, inspector) = self
.inner
.eth_api
.spawn_with_call_at(call, at, overrides, move |db, env| {
let (res, _) = inspect(db, env, &mut inspector)?;
Ok((res, inspector))
})
.await?;
let gas_used = res.result.gas_used();
let return_value = result_output(&res.result).unwrap_or_default().into();
let frame = inspector.into_geth_builder().geth_traces(gas_used, return_value, config);
@@ -365,6 +322,8 @@ where
/// Returns the trace frame and the state that got updated after executing the transaction.
///
/// Note: this does not apply any state overrides if they're configured in the `opts`.
///
/// Caution: this is blocking and should be performed on a blocking task.
fn trace_transaction(
&self,
opts: GethDebugTracingOptions,

View File

@@ -37,6 +37,7 @@ mod sign;
mod state;
mod transactions;
use crate::TracingCallPool;
pub use transactions::{EthTransactions, TransactionSource};
/// `Eth` API trait.
@@ -88,6 +89,7 @@ where
eth_cache: EthStateCache,
gas_oracle: GasPriceOracle<Provider>,
gas_cap: impl Into<GasCap>,
tracing_call_pool: TracingCallPool,
) -> Self {
Self::with_spawner(
provider,
@@ -97,10 +99,12 @@ where
gas_oracle,
gas_cap.into().into(),
Box::<TokioTaskExecutor>::default(),
tracing_call_pool,
)
}
/// Creates a new, shareable instance.
#[allow(clippy::too_many_arguments)]
pub fn with_spawner(
provider: Provider,
pool: Pool,
@@ -109,6 +113,7 @@ where
gas_oracle: GasPriceOracle<Provider>,
gas_cap: u64,
task_spawner: Box<dyn TaskSpawner>,
tracing_call_pool: TracingCallPool,
) -> Self {
// get the block number of the latest block
let latest_block = provider
@@ -129,6 +134,7 @@ where
starting_block: U256::from(latest_block),
task_spawner,
pending_block: Default::default(),
tracing_call_pool,
};
Self { inner: Arc::new(inner) }
}
@@ -421,4 +427,6 @@ struct EthApiInner<Provider, Pool, Network> {
task_spawner: Box<dyn TaskSpawner>,
/// Cached pending block if any
pending_block: Mutex<Option<PendingBlock>>,
/// A pool dedicated to tracing calls
tracing_call_pool: TracingCallPool,
}

View File

@@ -392,7 +392,7 @@ where
mod tests {
use crate::{
eth::{cache::EthStateCache, gas_oracle::GasPriceOracle},
EthApi,
EthApi, TracingCallPool,
};
use jsonrpsee::types::error::INVALID_PARAMS_CODE;
use reth_interfaces::test_utils::{generators, generators::Rng};
@@ -428,6 +428,7 @@ mod tests {
cache.clone(),
GasPriceOracle::new(provider, Default::default(), cache),
ETHEREUM_BLOCK_GAS_LIMIT,
TracingCallPool::build().expect("failed to build tracing pool"),
)
}

View File

@@ -146,7 +146,10 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::eth::{cache::EthStateCache, gas_oracle::GasPriceOracle};
use crate::{
eth::{cache::EthStateCache, gas_oracle::GasPriceOracle},
TracingCallPool,
};
use reth_primitives::{constants::ETHEREUM_BLOCK_GAS_LIMIT, StorageKey, StorageValue};
use reth_provider::test_utils::{ExtendedAccount, MockEthProvider, NoopProvider};
use reth_transaction_pool::test_utils::testing_pool;
@@ -165,6 +168,7 @@ mod tests {
cache.clone(),
GasPriceOracle::new(NoopProvider::default(), Default::default(), cache),
ETHEREUM_BLOCK_GAS_LIMIT,
TracingCallPool::build().expect("failed to build tracing pool"),
);
let address = Address::random();
let storage = eth_api.storage_at(address, U256::ZERO.into(), None).unwrap();
@@ -186,6 +190,7 @@ mod tests {
cache.clone(),
GasPriceOracle::new(mock_provider, Default::default(), cache),
ETHEREUM_BLOCK_GAS_LIMIT,
TracingCallPool::build().expect("failed to build tracing pool"),
);
let storage_key: U256 = storage_key.into();

View File

@@ -40,7 +40,10 @@ use revm_primitives::{utilities::create_address, Env, ResultAndState, SpecId};
/// Helper alias type for the state's [CacheDB]
pub(crate) type StateCacheDB<'r> = CacheDB<State<StateProviderBox<'r>>>;
/// Commonly used transaction related functions for the [EthApi] type in the `eth_` namespace
/// Commonly used transaction related functions for the [EthApi] type in the `eth_` namespace.
///
/// Async functions that are spawned onto the
/// [TracingCallPool](crate::tracing_call::TracingCallPool) begin with `spawn_`
#[async_trait::async_trait]
pub trait EthTransactions: Send + Sync {
/// Returns default gas limit to use for `eth_call` and tracing RPC methods.
@@ -54,6 +57,12 @@ pub trait EthTransactions: Send + Sync {
where
F: FnOnce(StateProviderBox<'_>) -> EthResult<T>;
/// Executes the closure with the state that corresponds to the given [BlockId] on a new task
async fn spawn_with_state_at_block<F, T>(&self, at: BlockId, f: F) -> EthResult<T>
where
F: FnOnce(StateProviderBox<'_>) -> EthResult<T> + Send + 'static,
T: Send + 'static;
/// Returns the revm evm env for the requested [BlockId]
///
/// If the [BlockId] this will return the [BlockId::Hash] of the block the env was configured
@@ -121,8 +130,8 @@ pub trait EthTransactions: Send + Sync {
async fn send_transaction(&self, request: TransactionRequest) -> EthResult<H256>;
/// Prepares the state and env for the given [CallRequest] at the given [BlockId] and executes
/// the closure.
async fn with_call_at<F, R>(
/// the closure on a new task returning the result of the closure.
async fn spawn_with_call_at<F, R>(
&self,
request: CallRequest,
at: BlockId,
@@ -130,7 +139,8 @@ pub trait EthTransactions: Send + Sync {
f: F,
) -> EthResult<R>
where
F: for<'r> FnOnce(StateCacheDB<'r>, Env) -> EthResult<R> + Send;
F: for<'r> FnOnce(StateCacheDB<'r>, Env) -> EthResult<R> + Send + 'static,
R: Send + 'static;
/// Executes the call request at the given [BlockId].
async fn transact_call_at(
@@ -140,8 +150,9 @@ pub trait EthTransactions: Send + Sync {
overrides: EvmOverrides,
) -> EthResult<(ResultAndState, Env)>;
/// Executes the call request at the given [BlockId]
async fn inspect_call_at<I>(
/// Executes the call request at the given [BlockId] on a new task and returns the result of the
/// inspect call.
async fn spawn_inspect_call_at<I>(
&self,
request: CallRequest,
at: BlockId,
@@ -149,24 +160,15 @@ pub trait EthTransactions: Send + Sync {
inspector: I,
) -> EthResult<(ResultAndState, Env)>
where
I: for<'r> Inspector<StateCacheDB<'r>> + Send;
/// Executes the call request at the given [BlockId]
async fn inspect_call_at_and_return_state<'a, I>(
&'a self,
request: CallRequest,
at: BlockId,
overrides: EvmOverrides,
inspector: I,
) -> EthResult<(ResultAndState, Env, StateCacheDB<'a>)>
where
I: Inspector<StateCacheDB<'a>> + Send;
I: for<'r> Inspector<StateCacheDB<'r>> + Send + 'static;
/// Executes the transaction on top of the given [BlockId] with a tracer configured by the
/// config.
///
/// The callback is then called with the [TracingInspector] and the [ResultAndState] after the
/// configured [Env] was inspected.
///
/// Caution: this is blocking
fn trace_at<F, R>(
&self,
env: Env,
@@ -184,7 +186,7 @@ pub trait EthTransactions: Send + Sync {
///
/// The callback is then called with the [TracingInspector] and the [ResultAndState] after the
/// configured [Env] was inspected.
fn trace_at_with_state<F, R>(
async fn spawn_trace_at_with_state<F, R>(
&self,
env: Env,
config: TracingInspectorConfig,
@@ -192,7 +194,10 @@ pub trait EthTransactions: Send + Sync {
f: F,
) -> EthResult<R>
where
F: for<'a> FnOnce(TracingInspector, ResultAndState, StateCacheDB<'a>) -> EthResult<R>;
F: for<'a> FnOnce(TracingInspector, ResultAndState, StateCacheDB<'a>) -> EthResult<R>
+ Send
+ 'static,
R: Send + 'static;
/// Fetches the transaction and the transaction's block
async fn transaction_and_block(
@@ -206,7 +211,10 @@ pub trait EthTransactions: Send + Sync {
/// state by executing them first.
/// The callback `f` is invoked with the [ResultAndState] after the transaction was executed and
/// the database that points to the beginning of the transaction.
async fn trace_transaction_in_block<F, R>(
///
/// Note: Implementers should use a threadpool where blocking is allowed, such as
/// [TracingCallPool](crate::tracing_call::TracingCallPool).
async fn spawn_trace_transaction_in_block<F, R>(
&self,
hash: H256,
config: TracingInspectorConfig,
@@ -219,7 +227,9 @@ pub trait EthTransactions: Send + Sync {
ResultAndState,
StateCacheDB<'a>,
) -> EthResult<R>
+ Send;
+ Send
+ 'static,
R: Send + 'static;
}
#[async_trait]
@@ -245,6 +255,22 @@ where
f(state)
}
async fn spawn_with_state_at_block<F, T>(&self, at: BlockId, f: F) -> EthResult<T>
where
F: FnOnce(StateProviderBox<'_>) -> EthResult<T> + Send + 'static,
T: Send + 'static,
{
let this = self.clone();
self.inner
.tracing_call_pool
.spawn(move || {
let state = this.state_at(at)?;
f(state)
})
.await
.map_err(|_| EthApiError::InternalTracingError)?
}
async fn evm_env_at(&self, at: BlockId) -> EthResult<(CfgEnv, BlockEnv, BlockId)> {
if at.is_pending() {
let PendingBlockEnv { cfg, block_env, origin } = self.pending_block_env_and_cfg()?;
@@ -473,7 +499,7 @@ where
Ok(hash)
}
async fn with_call_at<F, R>(
async fn spawn_with_call_at<F, R>(
&self,
request: CallRequest,
at: BlockId,
@@ -481,15 +507,29 @@ where
f: F,
) -> EthResult<R>
where
F: for<'r> FnOnce(StateCacheDB<'r>, Env) -> EthResult<R> + Send,
F: for<'r> FnOnce(StateCacheDB<'r>, Env) -> EthResult<R> + Send + 'static,
R: Send + 'static,
{
let (cfg, block_env, at) = self.evm_env_at(at).await?;
let state = self.state_at(at)?;
let mut db = SubState::new(State::new(state));
let this = self.clone();
self.inner
.tracing_call_pool
.spawn(move || {
let state = this.state_at(at)?;
let mut db = SubState::new(State::new(state));
let env =
prepare_call_env(cfg, block_env, request, self.call_gas_limit(), &mut db, overrides)?;
f(db, env)
let env = prepare_call_env(
cfg,
block_env,
request,
this.call_gas_limit(),
&mut db,
overrides,
)?;
f(db, env)
})
.await
.map_err(|_| EthApiError::InternalTracingError)?
}
async fn transact_call_at(
@@ -498,10 +538,11 @@ where
at: BlockId,
overrides: EvmOverrides,
) -> EthResult<(ResultAndState, Env)> {
self.with_call_at(request, at, overrides, |mut db, env| transact(&mut db, env)).await
self.spawn_with_call_at(request, at, overrides, move |mut db, env| transact(&mut db, env))
.await
}
async fn inspect_call_at<I>(
async fn spawn_inspect_call_at<I>(
&self,
request: CallRequest,
at: BlockId,
@@ -509,28 +550,10 @@ where
inspector: I,
) -> EthResult<(ResultAndState, Env)>
where
I: for<'r> Inspector<StateCacheDB<'r>> + Send,
I: for<'r> Inspector<StateCacheDB<'r>> + Send + 'static,
{
self.with_call_at(request, at, overrides, |db, env| inspect(db, env, inspector)).await
}
async fn inspect_call_at_and_return_state<'a, I>(
&'a self,
request: CallRequest,
at: BlockId,
overrides: EvmOverrides,
inspector: I,
) -> EthResult<(ResultAndState, Env, StateCacheDB<'a>)>
where
I: Inspector<StateCacheDB<'a>> + Send,
{
let (cfg, block_env, at) = self.evm_env_at(at).await?;
let state = self.state_at(at)?;
let mut db = SubState::new(State::new(state));
let env =
prepare_call_env(cfg, block_env, request, self.call_gas_limit(), &mut db, overrides)?;
inspect_and_return_db(db, env, inspector)
self.spawn_with_call_at(request, at, overrides, move |db, env| inspect(db, env, inspector))
.await
}
fn trace_at<F, R>(
@@ -553,7 +576,7 @@ where
})
}
fn trace_at_with_state<F, R>(
async fn spawn_trace_at_with_state<F, R>(
&self,
env: Env,
config: TracingInspectorConfig,
@@ -561,15 +584,19 @@ where
f: F,
) -> EthResult<R>
where
F: for<'a> FnOnce(TracingInspector, ResultAndState, StateCacheDB<'a>) -> EthResult<R>,
F: for<'a> FnOnce(TracingInspector, ResultAndState, StateCacheDB<'a>) -> EthResult<R>
+ Send
+ 'static,
R: Send + 'static,
{
self.with_state_at_block(at, |state| {
self.spawn_with_state_at_block(at, move |state| {
let db = SubState::new(State::new(state));
let mut inspector = TracingInspector::new(config);
let (res, _, db) = inspect_and_return_db(db, env, &mut inspector)?;
f(inspector, res, db)
})
.await
}
async fn transaction_and_block(
@@ -590,7 +617,7 @@ where
Ok(block.map(|block| (transaction, block.seal(block_hash))))
}
async fn trace_transaction_in_block<F, R>(
async fn spawn_trace_transaction_in_block<F, R>(
&self,
hash: H256,
config: TracingInspectorConfig,
@@ -603,7 +630,9 @@ where
ResultAndState,
StateCacheDB<'a>,
) -> EthResult<R>
+ Send,
+ Send
+ 'static,
R: Send + 'static,
{
let (transaction, block) = match self.transaction_and_block(hash).await? {
None => return Ok(None),
@@ -618,7 +647,7 @@ where
let parent_block = block.parent_hash;
let block_txs = block.body;
self.with_state_at_block(parent_block.into(), |state| {
self.spawn_with_state_at_block(parent_block.into(), move |state| {
let mut db = SubState::new(State::new(state));
// replay all transactions prior to the targeted transaction
@@ -630,6 +659,7 @@ where
let (res, _, db) = inspect_and_return_db(db, env, &mut inspector)?;
f(tx_info, inspector, res, db)
})
.await
.map(Some)
}
}
@@ -878,7 +908,7 @@ mod tests {
use super::*;
use crate::{
eth::{cache::EthStateCache, gas_oracle::GasPriceOracle},
EthApi,
EthApi, TracingCallPool,
};
use reth_network_api::noop::NoopNetwork;
use reth_primitives::{constants::ETHEREUM_BLOCK_GAS_LIMIT, hex_literal::hex, Bytes};
@@ -900,6 +930,7 @@ mod tests {
cache.clone(),
GasPriceOracle::new(noop_provider, Default::default(), cache),
ETHEREUM_BLOCK_GAS_LIMIT,
TracingCallPool::build().expect("failed to build tracing pool"),
);
// https://etherscan.io/tx/0xa694b71e6c128a2ed8e2e0f6770bddbe52e3bb8f10e8472f9a79ab81497a8b5d

View File

@@ -31,7 +31,6 @@
//! disk-io, hence these calls are spawned as futures to a blocking task manually.
mod admin;
mod call_guard;
mod debug;
mod engine;
pub mod eth;
@@ -41,11 +40,11 @@ mod otterscan;
mod reth;
mod rpc;
mod trace;
pub mod tracing_call;
mod txpool;
mod web3;
pub use admin::AdminApi;
pub use call_guard::TracingCallGuard;
pub use debug::DebugApi;
pub use engine::{EngineApi, EngineEthApi};
pub use eth::{EthApi, EthApiSpec, EthFilter, EthPubSub, EthSubscriptionIdProvider};
@@ -55,6 +54,7 @@ pub use otterscan::OtterscanApi;
pub use reth::RethApi;
pub use rpc::RPCApi;
pub use trace::TraceApi;
pub use tracing_call::{TracingCallGuard, TracingCallPool};
pub use txpool::TxPoolApi;
pub use web3::Web3Api;

View File

@@ -1,8 +1,7 @@
use crate::{
eth::{
cache::EthStateCache,
error::{EthApiError, EthResult},
revm_utils::{inspect, prepare_call_env, EvmOverrides},
revm_utils::{inspect, inspect_and_return_db, prepare_call_env, EvmOverrides},
utils::recover_raw_transaction,
EthTransactions,
},
@@ -29,11 +28,10 @@ use reth_rpc_types::{
trace::{filter::TraceFilter, parity::*},
BlockError, BlockOverrides, CallRequest, Index, TransactionInfo,
};
use reth_tasks::TaskSpawner;
use revm::{db::CacheDB, primitives::Env};
use revm_primitives::{db::DatabaseCommit, ExecutionResult, ResultAndState};
use std::{collections::HashSet, future::Future, sync::Arc};
use tokio::sync::{oneshot, AcquireError, OwnedSemaphorePermit};
use std::{collections::HashSet, sync::Arc};
use tokio::sync::{AcquireError, OwnedSemaphorePermit};
/// `trace` API implementation.
///
@@ -51,20 +49,8 @@ impl<Provider, Eth> TraceApi<Provider, Eth> {
}
/// Create a new instance of the [TraceApi]
pub fn new(
provider: Provider,
eth_api: Eth,
eth_cache: EthStateCache,
task_spawner: Box<dyn TaskSpawner>,
tracing_call_guard: TracingCallGuard,
) -> Self {
let inner = Arc::new(TraceApiInner {
provider,
eth_api,
eth_cache,
task_spawner,
tracing_call_guard,
});
pub fn new(provider: Provider, eth_api: Eth, tracing_call_guard: TracingCallGuard) -> Self {
let inner = Arc::new(TraceApiInner { provider, eth_api, tracing_call_guard });
Self { inner }
}
@@ -83,23 +69,6 @@ where
Provider: BlockReader + StateProviderFactory + EvmEnvProvider + ChainSpecProvider + 'static,
Eth: EthTransactions + 'static,
{
/// Executes the future on a new blocking task.
async fn on_blocking_task<C, F, R>(&self, c: C) -> EthResult<R>
where
C: FnOnce(Self) -> F,
F: Future<Output = EthResult<R>> + Send + 'static,
R: Send + 'static,
{
let (tx, rx) = oneshot::channel();
let this = self.clone();
let f = c(this);
self.inner.task_spawner.spawn_blocking(Box::pin(async move {
let res = f.await;
let _ = tx.send(res);
}));
rx.await.map_err(|_| EthApiError::InternalTracingError)?
}
/// Executes the given call and returns a number of possible traces for it.
pub async fn trace_call(
&self,
@@ -108,43 +77,23 @@ where
block_id: Option<BlockId>,
state_overrides: Option<StateOverride>,
block_overrides: Option<Box<BlockOverrides>>,
) -> EthResult<TraceResults> {
self.on_blocking_task(|this| async move {
this.try_trace_call(
call,
trace_types,
block_id,
EvmOverrides::new(state_overrides, block_overrides),
)
.await
})
.await
}
async fn try_trace_call(
&self,
call: CallRequest,
trace_types: HashSet<TraceType>,
block_id: Option<BlockId>,
overrides: EvmOverrides,
) -> EthResult<TraceResults> {
let at = block_id.unwrap_or(BlockId::Number(BlockNumberOrTag::Latest));
let config = tracing_config(&trace_types);
let overrides = EvmOverrides::new(state_overrides, block_overrides);
let mut inspector = TracingInspector::new(config);
let (res, _, db) = self
.inner
self.inner
.eth_api
.inspect_call_at_and_return_state(call, at, overrides, &mut inspector)
.await?;
let trace_res = inspector.into_parity_builder().into_trace_results_with_state(
res,
&trace_types,
&db,
)?;
Ok(trace_res)
.spawn_with_call_at(call, at, overrides, move |db, env| {
let (res, _, db) = inspect_and_return_db(db, env, &mut inspector)?;
let trace_res = inspector.into_parity_builder().into_trace_results_with_state(
res,
&trace_types,
&db,
)?;
Ok(trace_res)
})
.await
}
/// Traces a call to `eth_sendRawTransaction` without making the call, returning the traces.
@@ -166,16 +115,16 @@ where
let config = tracing_config(&trace_types);
self.on_blocking_task(|this| async move {
this.inner.eth_api.trace_at_with_state(env, config, at, |inspector, res, db| {
self.inner
.eth_api
.spawn_trace_at_with_state(env, config, at, move |inspector, res, db| {
Ok(inspector.into_parity_builder().into_trace_results_with_state(
res,
&trace_types,
&db,
)?)
})
})
.await
.await
}
/// Performs multiple call traces on top of the same block. i.e. transaction n will be executed
@@ -190,10 +139,11 @@ where
let at = block_id.unwrap_or(BlockId::Number(BlockNumberOrTag::Pending));
let (cfg, block_env, at) = self.inner.eth_api.evm_env_at(at).await?;
self.on_blocking_task(|this| async move {
let gas_limit = this.inner.eth_api.call_gas_limit();
// execute all transactions on top of each other and record the traces
this.inner.eth_api.with_state_at_block(at, move |state| {
let gas_limit = self.inner.eth_api.call_gas_limit();
// execute all transactions on top of each other and record the traces
self.inner
.eth_api
.spawn_with_state_at_block(at, move |state| {
let mut results = Vec::with_capacity(calls.len());
let mut db = SubState::new(State::new(state));
@@ -239,8 +189,7 @@ where
Ok(results)
})
})
.await
.await
}
/// Replays a transaction, returning the traces.
@@ -250,22 +199,19 @@ where
trace_types: HashSet<TraceType>,
) -> EthResult<TraceResults> {
let config = tracing_config(&trace_types);
self.on_blocking_task(|this| async move {
this.inner
.eth_api
.trace_transaction_in_block(hash, config, |_, inspector, res, db| {
let trace_res = inspector.into_parity_builder().into_trace_results_with_state(
res,
&trace_types,
&db,
)?;
Ok(trace_res)
})
.await
.transpose()
.ok_or_else(|| EthApiError::TransactionNotFound)?
})
.await
self.inner
.eth_api
.spawn_trace_transaction_in_block(hash, config, move |_, inspector, res, db| {
let trace_res = inspector.into_parity_builder().into_trace_results_with_state(
res,
&trace_types,
&db,
)?;
Ok(trace_res)
})
.await
.transpose()
.ok_or_else(|| EthApiError::TransactionNotFound)?
}
/// Returns transaction trace objects at the given index
@@ -308,22 +254,18 @@ where
&self,
hash: H256,
) -> EthResult<Option<Vec<LocalizedTransactionTrace>>> {
self.on_blocking_task(|this| async move {
this.inner
.eth_api
.trace_transaction_in_block(
hash,
TracingInspectorConfig::default_parity(),
|tx_info, inspector, _, _| {
let traces = inspector
.into_parity_builder()
.into_localized_transaction_traces(tx_info);
Ok(traces)
},
)
.await
})
.await
self.inner
.eth_api
.spawn_trace_transaction_in_block(
hash,
TracingInspectorConfig::default_parity(),
move |tx_info, inspector, _, _| {
let traces =
inspector.into_parity_builder().into_localized_transaction_traces(tx_info);
Ok(traces)
},
)
.await
}
/// Executes all transactions of a block and returns a list of callback results.
@@ -371,48 +313,46 @@ where
let block_hash = block.hash;
let transactions = block.body;
self.on_blocking_task(|this| async move {
// replay all transactions of the block
this.inner
.eth_api
.with_state_at_block(state_at.into(), move |state| {
let mut results = Vec::with_capacity(transactions.len());
let mut db = SubState::new(State::new(state));
// replay all transactions of the block
self.inner
.eth_api
.spawn_with_state_at_block(state_at.into(), move |state| {
let mut results = Vec::with_capacity(transactions.len());
let mut db = SubState::new(State::new(state));
let mut transactions = transactions.into_iter().enumerate().peekable();
let mut transactions = transactions.into_iter().enumerate().peekable();
while let Some((idx, tx)) = transactions.next() {
let tx = tx.into_ecrecovered().ok_or(BlockError::InvalidSignature)?;
let tx_info = TransactionInfo {
hash: Some(tx.hash()),
index: Some(idx as u64),
block_hash: Some(block_hash),
block_number: Some(block_env.number.try_into().unwrap_or(u64::MAX)),
base_fee: Some(block_env.basefee.try_into().unwrap_or(u64::MAX)),
};
while let Some((idx, tx)) = transactions.next() {
let tx = tx.into_ecrecovered().ok_or(BlockError::InvalidSignature)?;
let tx_info = TransactionInfo {
hash: Some(tx.hash()),
index: Some(idx as u64),
block_hash: Some(block_hash),
block_number: Some(block_env.number.try_into().unwrap_or(u64::MAX)),
base_fee: Some(block_env.basefee.try_into().unwrap_or(u64::MAX)),
};
let tx = tx_env_with_recovered(&tx);
let env = Env { cfg: cfg.clone(), block: block_env.clone(), tx };
let tx = tx_env_with_recovered(&tx);
let env = Env { cfg: cfg.clone(), block: block_env.clone(), tx };
let mut inspector = TracingInspector::new(config);
let (res, _) = inspect(&mut db, env, &mut inspector)?;
let ResultAndState { result, state } = res;
results.push(f(tx_info, inspector, result, &state, &db)?);
let mut inspector = TracingInspector::new(config);
let (res, _) = inspect(&mut db, env, &mut inspector)?;
let ResultAndState { result, state } = res;
results.push(f(tx_info, inspector, result, &state, &db)?);
// need to apply the state changes of this transaction before executing the
// next transaction
if transactions.peek().is_some() {
// need to apply the state changes of this transaction before executing
// the next transaction
db.commit(state)
}
// need to apply the state changes of this transaction before executing the
// next transaction
if transactions.peek().is_some() {
// need to apply the state changes of this transaction before executing
// the next transaction
db.commit(state)
}
}
Ok(results)
})
.map(Some)
})
.await
Ok(results)
})
.await
.map(Some)
}
/// Returns traces created at given block.
@@ -626,11 +566,6 @@ struct TraceApiInner<Provider, Eth> {
provider: Provider,
/// Access to commonly used code of the `eth` namespace
eth_api: Eth,
/// The async cache frontend for eth-related data
#[allow(unused)] // we need this for trace_filter eventually
eth_cache: EthStateCache,
/// The type that can spawn tasks which would otherwise be blocking.
task_spawner: Box<dyn TaskSpawner>,
// restrict the number of concurrent calls to `trace_*`
tracing_call_guard: TracingCallGuard,
}

View File

@@ -8,9 +8,46 @@ use std::{
task::{ready, Context, Poll},
thread,
};
use tokio::sync::oneshot;
use tokio::sync::{oneshot, AcquireError, OwnedSemaphorePermit, Semaphore};
/// RPC Tracing call guard semaphore.
///
/// This is used to restrict the number of concurrent RPC requests to tracing methods like
/// `debug_traceTransaction` because they can consume a lot of memory and CPU.
///
/// This types serves as an entry guard for the [TracingCallPool] and is used to rate limit parallel
/// tracing calls on the pool.
#[derive(Clone, Debug)]
pub struct TracingCallGuard(Arc<Semaphore>);
impl TracingCallGuard {
/// Create a new `TracingCallGuard` with the given maximum number of tracing calls in parallel.
pub fn new(max_tracing_requests: u32) -> Self {
Self(Arc::new(Semaphore::new(max_tracing_requests as usize)))
}
/// See also [Semaphore::acquire_owned]
pub async fn acquire_owned(self) -> Result<OwnedSemaphorePermit, AcquireError> {
self.0.acquire_owned().await
}
/// See also [Semaphore::acquire_many_owned]
pub async fn acquire_many_owned(self, n: u32) -> Result<OwnedSemaphorePermit, AcquireError> {
self.0.acquire_many_owned(n).await
}
}
/// Used to execute tracing calls on a rayon threadpool from within a tokio runtime.
///
/// This is a dedicated threadpool for tracing calls which are CPU bound.
/// RPC calls that perform blocking IO (disk lookups) are not executed on this pool but on the tokio
/// runtime's blocking pool, which performs poorly with CPU bound tasks. Once the tokio blocking
/// pool is saturated it is converted into a queue, tracing calls could then interfere with the
/// queue and block other RPC calls.
///
/// See also [tokio-docs] for more information.
///
/// [tokio-docs]: https://docs.rs/tokio/latest/tokio/index.html#cpu-bound-tasks-and-blocking-code
#[derive(Clone, Debug)]
pub struct TracingCallPool {
pool: Arc<rayon::ThreadPool>,