feat: add semaphore for blocking IO requests (#20289)

This commit is contained in:
Matthias Seitz
2025-12-11 12:35:50 +01:00
committed by GitHub
parent 28e7c8a7cb
commit 2e567d6658
13 changed files with 224 additions and 11 deletions

View File

@@ -1179,6 +1179,7 @@ impl<'a, N: FullNodeComponents<Types: NodeTypes<ChainSpec: Hardforks + EthereumH
.proof_permits(self.config.proof_permits)
.gas_oracle_config(self.config.gas_oracle)
.max_batch_size(self.config.max_batch_size)
.max_blocking_io_requests(self.config.max_blocking_io_requests)
.pending_block_kind(self.config.pending_block_kind)
.raw_tx_forwarder(self.config.raw_tx_forwarder)
.evm_memory_limit(self.config.rpc_evm_memory_limit)

View File

@@ -37,6 +37,9 @@ pub(crate) const RPC_DEFAULT_MAX_REQUEST_SIZE_MB: u32 = 15;
pub(crate) const RPC_DEFAULT_MAX_RESPONSE_SIZE_MB: u32 = 160;
/// Default number of incoming connections.
///
/// This restricts how many active connections (http, ws) the server accepts.
/// Once exceeded, the server can reject new connections.
pub(crate) const RPC_DEFAULT_MAX_CONNECTIONS: u32 = 500;
/// Parameters for configuring the rpc more granularity via CLI
@@ -166,6 +169,14 @@ pub struct RpcServerArgs {
#[arg(long = "rpc.max-tracing-requests", alias = "rpc-max-tracing-requests", value_name = "COUNT", default_value_t = constants::default_max_tracing_requests())]
pub rpc_max_tracing_requests: usize,
/// Maximum number of concurrent blocking IO requests.
///
/// Blocking IO requests include `eth_call`, `eth_estimateGas`, and similar methods that
/// require EVM execution. These are spawned as blocking tasks to avoid blocking the async
/// runtime.
#[arg(long = "rpc.max-blocking-io-requests", alias = "rpc-max-blocking-io-requests", value_name = "COUNT", default_value_t = constants::DEFAULT_MAX_BLOCKING_IO_REQUEST)]
pub rpc_max_blocking_io_requests: usize,
/// Maximum number of blocks for `trace_filter` requests.
#[arg(long = "rpc.max-trace-filter-blocks", alias = "rpc-max-trace-filter-blocks", value_name = "COUNT", default_value_t = constants::DEFAULT_MAX_TRACE_FILTER_BLOCKS)]
pub rpc_max_trace_filter_blocks: u64,
@@ -414,6 +425,7 @@ impl Default for RpcServerArgs {
rpc_max_subscriptions_per_connection: RPC_DEFAULT_MAX_SUBS_PER_CONN.into(),
rpc_max_connections: RPC_DEFAULT_MAX_CONNECTIONS.into(),
rpc_max_tracing_requests: constants::default_max_tracing_requests(),
rpc_max_blocking_io_requests: constants::DEFAULT_MAX_BLOCKING_IO_REQUEST,
rpc_max_trace_filter_blocks: constants::DEFAULT_MAX_TRACE_FILTER_BLOCKS,
rpc_max_blocks_per_filter: constants::DEFAULT_MAX_BLOCKS_PER_FILTER.into(),
rpc_max_logs_per_response: (constants::DEFAULT_MAX_LOGS_PER_RESPONSE as u64).into(),

View File

@@ -272,6 +272,11 @@ where
fn tracing_task_guard(&self) -> &BlockingTaskGuard {
self.inner.eth_api.blocking_task_guard()
}
#[inline]
fn blocking_io_task_guard(&self) -> &Arc<tokio::sync::Semaphore> {
self.inner.eth_api.blocking_io_request_semaphore()
}
}
impl<N, Rpc> LoadFee for OpEthApi<N, Rpc>

View File

@@ -94,6 +94,7 @@ impl RethRpcServerConfig for RpcServerArgs {
fn eth_config(&self) -> EthConfig {
EthConfig::default()
.max_tracing_requests(self.rpc_max_tracing_requests)
.max_blocking_io_requests(self.rpc_max_blocking_io_requests)
.max_trace_filter_blocks(self.rpc_max_trace_filter_blocks)
.max_blocks_per_filter(self.rpc_max_blocks_per_filter.unwrap_or_max())
.max_logs_per_response(self.rpc_max_logs_per_response.unwrap_or_max() as usize)

View File

@@ -7,18 +7,29 @@ use reth_tasks::{
pool::{BlockingTaskGuard, BlockingTaskPool},
TaskSpawner,
};
use tokio::sync::{oneshot, AcquireError, OwnedSemaphorePermit};
use std::sync::Arc;
use tokio::sync::{oneshot, AcquireError, OwnedSemaphorePermit, Semaphore};
use crate::EthApiTypes;
/// Executes code on a blocking thread.
/// Helpers for spawning blocking operations.
///
/// Operations can be blocking because they require lots of CPU work and/or IO.
///
/// This differentiates between workloads that are primarily CPU bound and heavier in general (such
/// as tracing tasks) and tasks that have a more balanced profile (io and cpu), such as `eth_call`
/// and alike.
///
/// This provides access to semaphores that permit how many of those are permitted concurrently.
/// It's expected that tracing related tasks are configured with a lower threshold, because not only
/// are they CPU heavy but they can also accumulate more memory for the traces.
pub trait SpawnBlocking: EthApiTypes + Clone + Send + Sync + 'static {
/// Returns a handle for spawning IO heavy blocking tasks.
///
/// Runtime access in default trait method implementations.
fn io_task_spawner(&self) -> impl TaskSpawner;
/// Returns a handle for spawning CPU heavy blocking tasks.
/// Returns a handle for spawning __CPU heavy__ blocking tasks, such as tracing requests.
///
/// Thread pool access in default trait method implementations.
fn tracing_task_pool(&self) -> &BlockingTaskPool;
@@ -26,21 +37,121 @@ pub trait SpawnBlocking: EthApiTypes + Clone + Send + Sync + 'static {
/// Returns handle to semaphore for pool of CPU heavy blocking tasks.
fn tracing_task_guard(&self) -> &BlockingTaskGuard;
/// Returns handle to semaphore for blocking IO tasks.
///
/// This semaphore is used to limit concurrent blocking IO operations like `eth_call`,
/// `eth_estimateGas`, and similar methods that require EVM execution.
fn blocking_io_task_guard(&self) -> &Arc<Semaphore>;
/// Acquires a permit from the tracing task semaphore.
///
/// This should be used for __CPU heavy__ operations like `debug_traceTransaction`,
/// `debug_traceCall`, and similar tracing methods. These tasks are typically:
/// - Primarily CPU bound with intensive computation
/// - Can accumulate significant memory for trace results
/// - Expected to have lower concurrency limits than general blocking IO tasks
///
/// For blocking IO tasks like `eth_call` or `eth_estimateGas`, use
/// [`acquire_owned_blocking_io`](Self::acquire_owned_blocking_io) instead.
///
/// See also [`Semaphore::acquire_owned`](`tokio::sync::Semaphore::acquire_owned`).
fn acquire_owned(
fn acquire_owned_tracing(
&self,
) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
self.tracing_task_guard().clone().acquire_owned()
}
/// Acquires multiple permits from the tracing task semaphore.
///
/// This should be used for particularly heavy tracing operations that require more resources
/// than a standard trace. The permit count should reflect the expected resource consumption
/// relative to a standard tracing operation.
///
/// Like [`acquire_owned_tracing`](Self::acquire_owned_tracing), this is specifically for
/// CPU-intensive tracing tasks, not general blocking IO operations.
///
/// See also [`Semaphore::acquire_many_owned`](`tokio::sync::Semaphore::acquire_many_owned`).
fn acquire_many_owned(
fn acquire_many_owned_tracing(
&self,
n: u32,
) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
self.tracing_task_guard().clone().acquire_many_owned(n)
}
/// Acquires a permit from the blocking IO request semaphore.
///
/// This should be used for operations like `eth_call`, `eth_estimateGas`, and similar methods
/// that require EVM execution and are spawned as blocking tasks.
///
/// See also [`Semaphore::acquire_owned`](`tokio::sync::Semaphore::acquire_owned`).
fn acquire_owned_blocking_io(
&self,
) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
self.blocking_io_task_guard().clone().acquire_owned()
}
/// Acquires multiple permits from the blocking IO request semaphore.
///
/// This should be used for operations that may require more resources than a single permit
/// allows.
///
/// See also [`Semaphore::acquire_many_owned`](`tokio::sync::Semaphore::acquire_many_owned`).
fn acquire_many_owned_blocking_io(
&self,
n: u32,
) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
self.blocking_io_task_guard().clone().acquire_many_owned(n)
}
/// Acquires permits from the blocking IO request semaphore based on a calculated weight.
///
/// The weight determines the maximum number of concurrent requests of this type that can run.
/// For example, if the semaphore has 256 total permits and `weight=10`, then at most 10
/// concurrent requests of this type are allowed.
///
/// The permits acquired per request is calculated as `total_permits / weight`, with an
/// adjustment: if this result is even, we add 1 to ensure that `weight - 1` permits are
/// always available for other tasks, preventing complete semaphore exhaustion.
///
/// This should be used to explicitly limit concurrent requests based on their expected
/// resource consumption:
///
/// - **Block range queries**: Higher weight for larger ranges (fewer concurrent requests)
/// - **Complex calls**: Higher weight for expensive operations
/// - **Batch operations**: Higher weight for larger batches
/// - **Historical queries**: Higher weight for deeper history lookups
///
/// # Examples
///
/// ```ignore
/// // For a heavy request, use higher weight to limit concurrency
/// let weight = 20; // Allow at most 20 concurrent requests of this type
/// let _permit = self.acquire_weighted_blocking_io(weight).await?;
/// ```
///
/// This helps prevent resource exhaustion from concurrent expensive operations while allowing
/// many cheap operations to run in parallel.
///
/// See also [`Semaphore::acquire_many_owned`](`tokio::sync::Semaphore::acquire_many_owned`).
fn acquire_weighted_blocking_io(
&self,
weight: u32,
) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
let guard = self.blocking_io_task_guard();
let total_permits = guard.available_permits().max(1) as u32;
let weight = weight.max(1);
let mut permits_to_acquire = (total_permits / weight).max(1);
// If total_permits divides evenly by weight, add 1 to ensure that when `weight`
// concurrent requests are running, at least `weight - 1` permits remain available
// for other tasks
if total_permits.is_multiple_of(weight) {
permits_to_acquire += 1;
}
guard.clone().acquire_many_owned(permits_to_acquire)
}
/// Executes the future on a new blocking task.
///
/// Note: This is expected for futures that are dominated by blocking IO operations, for tracing

View File

@@ -212,6 +212,7 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
overrides: EvmOverrides,
) -> impl Future<Output = Result<Bytes, Self::Error>> + Send {
async move {
let _permit = self.acquire_owned_blocking_io().await;
let res =
self.transact_call_at(request, block_number.unwrap_or_default(), overrides).await?;

View File

@@ -97,7 +97,7 @@ pub trait EthState: LoadState + SpawnBlocking {
{
Ok(async move {
let _permit = self
.acquire_owned()
.acquire_owned_tracing()
.await
.map_err(RethError::other)
.map_err(EthApiError::Internal)?;

View File

@@ -8,9 +8,10 @@ use crate::{
};
use reqwest::Url;
use reth_rpc_server_types::constants::{
default_max_tracing_requests, DEFAULT_ETH_PROOF_WINDOW, DEFAULT_MAX_BLOCKS_PER_FILTER,
DEFAULT_MAX_LOGS_PER_RESPONSE, DEFAULT_MAX_SIMULATE_BLOCKS, DEFAULT_MAX_TRACE_FILTER_BLOCKS,
DEFAULT_PROOF_PERMITS, RPC_DEFAULT_SEND_RAW_TX_SYNC_TIMEOUT_SECS,
default_max_tracing_requests, DEFAULT_ETH_PROOF_WINDOW, DEFAULT_MAX_BLOCKING_IO_REQUEST,
DEFAULT_MAX_BLOCKS_PER_FILTER, DEFAULT_MAX_LOGS_PER_RESPONSE, DEFAULT_MAX_SIMULATE_BLOCKS,
DEFAULT_MAX_TRACE_FILTER_BLOCKS, DEFAULT_PROOF_PERMITS,
RPC_DEFAULT_SEND_RAW_TX_SYNC_TIMEOUT_SECS,
};
use serde::{Deserialize, Serialize};
@@ -68,6 +69,15 @@ pub struct EthConfig {
pub eth_proof_window: u64,
/// The maximum number of tracing calls that can be executed in concurrently.
pub max_tracing_requests: usize,
/// The maximum number of blocking IO calls that can be executed in concurrently.
///
/// Requests such as `eth_call`, `eth_estimateGas` and alike require evm execution, which is
/// considered blocking since it's usually more heavy on the IO side but also CPU constrained.
/// It is expected that these are spawned as short lived blocking tokio tasks. This config
/// determines how many can be spawned concurrently, to avoid a build up in the tokio's
/// blocking pool queue since there's only a limited number of threads available. This setting
/// restricts how many tasks are spawned concurrently.
pub max_blocking_io_requests: usize,
/// Maximum number of blocks for `trace_filter` requests.
pub max_trace_filter_blocks: u64,
/// Maximum number of blocks that could be scanned per filter request in `eth_getLogs` calls.
@@ -116,6 +126,7 @@ impl Default for EthConfig {
gas_oracle: GasPriceOracleConfig::default(),
eth_proof_window: DEFAULT_ETH_PROOF_WINDOW,
max_tracing_requests: default_max_tracing_requests(),
max_blocking_io_requests: DEFAULT_MAX_BLOCKING_IO_REQUEST,
max_trace_filter_blocks: DEFAULT_MAX_TRACE_FILTER_BLOCKS,
max_blocks_per_filter: DEFAULT_MAX_BLOCKS_PER_FILTER,
max_logs_per_response: DEFAULT_MAX_LOGS_PER_RESPONSE,
@@ -152,6 +163,12 @@ impl EthConfig {
self
}
/// Configures the maximum number of blocking IO requests
pub const fn max_blocking_io_requests(mut self, max_requests: usize) -> Self {
self.max_blocking_io_requests = max_requests;
self
}
/// Configures the maximum block length to scan per `eth_getLogs` request
pub const fn max_blocks_per_filter(mut self, max_blocks: u64) -> Self {
self.max_blocks_per_filter = max_blocks;

View File

@@ -18,6 +18,20 @@ pub const DEFAULT_MAX_LOGS_PER_RESPONSE: usize = 20_000;
/// The default maximum number of blocks for `trace_filter` requests.
pub const DEFAULT_MAX_TRACE_FILTER_BLOCKS: u64 = 100;
/// Setting for how many concurrent (heavier) _blocking_ IO requests are allowed.
///
/// What is considered a blocking IO request can depend on the RPC method. In general anything that
/// requires IO is considered blocking and should be spawned as blocking. This setting is however,
/// primarily intended for heavier blocking requests that require evm execution for example,
/// `eth_call` and alike. This is intended to be used with a semaphore that must be acquired before
/// a new task is spawned to avoid unnecessary pooling if the number of inflight requests exceeds
/// the available threads in the pool.
///
/// tokio's blocking pool, has a default of 512 and could grow unbounded, since requests like
/// `eth_call` also require a lot of cpu which will occupy the thread, we can set this to a lower
/// value.
pub const DEFAULT_MAX_BLOCKING_IO_REQUEST: usize = 256;
/// The default maximum number tracing requests we're allowing concurrently.
/// Tracing is mostly CPU bound so we're limiting the number of concurrent requests to something
/// lower that the number of cores, in order to minimize the impact on the rest of the system.

View File

@@ -15,7 +15,8 @@ use reth_rpc_eth_types::{
FeeHistoryCacheConfig, ForwardConfig, GasCap, GasPriceOracle, GasPriceOracleConfig,
};
use reth_rpc_server_types::constants::{
DEFAULT_ETH_PROOF_WINDOW, DEFAULT_MAX_SIMULATE_BLOCKS, DEFAULT_PROOF_PERMITS,
DEFAULT_ETH_PROOF_WINDOW, DEFAULT_MAX_BLOCKING_IO_REQUEST, DEFAULT_MAX_SIMULATE_BLOCKS,
DEFAULT_PROOF_PERMITS,
};
use reth_tasks::{pool::BlockingTaskPool, TaskSpawner, TokioTaskExecutor};
use std::{sync::Arc, time::Duration};
@@ -41,6 +42,7 @@ pub struct EthApiBuilder<N: RpcNodeCore, Rpc, NextEnv = ()> {
task_spawner: Box<dyn TaskSpawner + 'static>,
next_env: NextEnv,
max_batch_size: usize,
max_blocking_io_requests: usize,
pending_block_kind: PendingBlockKind,
raw_tx_forwarder: ForwardConfig,
send_raw_transaction_sync_timeout: Duration,
@@ -92,6 +94,7 @@ impl<N: RpcNodeCore, Rpc, NextEnv> EthApiBuilder<N, Rpc, NextEnv> {
task_spawner,
next_env,
max_batch_size,
max_blocking_io_requests,
pending_block_kind,
raw_tx_forwarder,
send_raw_transaction_sync_timeout,
@@ -113,6 +116,7 @@ impl<N: RpcNodeCore, Rpc, NextEnv> EthApiBuilder<N, Rpc, NextEnv> {
task_spawner,
next_env,
max_batch_size,
max_blocking_io_requests,
pending_block_kind,
raw_tx_forwarder,
send_raw_transaction_sync_timeout,
@@ -145,6 +149,7 @@ where
eth_state_cache_config: Default::default(),
next_env: Default::default(),
max_batch_size: 1,
max_blocking_io_requests: DEFAULT_MAX_BLOCKING_IO_REQUEST,
pending_block_kind: PendingBlockKind::Full,
raw_tx_forwarder: ForwardConfig::default(),
send_raw_transaction_sync_timeout: Duration::from_secs(30),
@@ -184,6 +189,7 @@ where
gas_oracle_config,
next_env,
max_batch_size,
max_blocking_io_requests,
pending_block_kind,
raw_tx_forwarder,
send_raw_transaction_sync_timeout,
@@ -205,6 +211,7 @@ where
gas_oracle_config,
next_env,
max_batch_size,
max_blocking_io_requests,
pending_block_kind,
raw_tx_forwarder,
send_raw_transaction_sync_timeout,
@@ -233,6 +240,7 @@ where
gas_oracle_config,
next_env: _,
max_batch_size,
max_blocking_io_requests,
pending_block_kind,
raw_tx_forwarder,
send_raw_transaction_sync_timeout,
@@ -254,6 +262,7 @@ where
gas_oracle_config,
next_env,
max_batch_size,
max_blocking_io_requests,
pending_block_kind,
raw_tx_forwarder,
send_raw_transaction_sync_timeout,
@@ -335,6 +344,12 @@ where
self
}
/// Sets the maximum number of concurrent blocking IO requests.
pub const fn max_blocking_io_requests(mut self, max_blocking_io_requests: usize) -> Self {
self.max_blocking_io_requests = max_blocking_io_requests;
self
}
/// Sets the pending block kind
pub const fn pending_block_kind(mut self, pending_block_kind: PendingBlockKind) -> Self {
self.pending_block_kind = pending_block_kind;
@@ -482,6 +497,7 @@ where
task_spawner,
next_env,
max_batch_size,
max_blocking_io_requests,
pending_block_kind,
raw_tx_forwarder,
send_raw_transaction_sync_timeout,
@@ -523,6 +539,7 @@ where
rpc_converter,
next_env,
max_batch_size,
max_blocking_io_requests,
pending_block_kind,
raw_tx_forwarder.forwarder_client(),
send_raw_transaction_sync_timeout,

View File

@@ -33,7 +33,7 @@ use reth_transaction_pool::{
blobstore::BlobSidecarConverter, noop::NoopTransactionPool, AddedTransactionOutcome,
BatchTxProcessor, BatchTxRequest, TransactionPool,
};
use tokio::sync::{broadcast, mpsc, Mutex};
use tokio::sync::{broadcast, mpsc, Mutex, Semaphore};
const DEFAULT_BROADCAST_CAPACITY: usize = 2000;
@@ -152,6 +152,7 @@ where
proof_permits: usize,
rpc_converter: Rpc,
max_batch_size: usize,
max_blocking_io_requests: usize,
pending_block_kind: PendingBlockKind,
raw_tx_forwarder: ForwardConfig,
send_raw_transaction_sync_timeout: Duration,
@@ -171,6 +172,7 @@ where
rpc_converter,
(),
max_batch_size,
max_blocking_io_requests,
pending_block_kind,
raw_tx_forwarder.forwarder_client(),
send_raw_transaction_sync_timeout,
@@ -263,6 +265,11 @@ where
fn tracing_task_guard(&self) -> &BlockingTaskGuard {
self.inner.blocking_task_guard()
}
#[inline]
fn blocking_io_task_guard(&self) -> &std::sync::Arc<tokio::sync::Semaphore> {
self.inner.blocking_io_request_semaphore()
}
}
/// Container type `EthApi`
@@ -296,6 +303,9 @@ pub struct EthApiInner<N: RpcNodeCore, Rpc: RpcConvert> {
/// Guard for getproof calls
blocking_task_guard: BlockingTaskGuard,
/// Semaphore to limit concurrent blocking IO requests (`eth_call`, `eth_estimateGas`, etc.)
blocking_io_request_semaphore: Arc<Semaphore>,
/// Transaction broadcast channel
raw_tx_sender: broadcast::Sender<Bytes>,
@@ -346,6 +356,7 @@ where
converter: Rpc,
next_env: impl PendingEnvBuilder<N::Evm>,
max_batch_size: usize,
max_blocking_io_requests: usize,
pending_block_kind: PendingBlockKind,
raw_tx_forwarder: Option<RpcClient>,
send_raw_transaction_sync_timeout: Duration,
@@ -384,6 +395,7 @@ where
blocking_task_pool,
fee_history_cache,
blocking_task_guard: BlockingTaskGuard::new(proof_permits),
blocking_io_request_semaphore: Arc::new(Semaphore::new(max_blocking_io_requests)),
raw_tx_sender,
raw_tx_forwarder,
converter,
@@ -440,6 +452,8 @@ where
}
/// Returns a handle to the blocking thread pool.
///
/// This is intended for tasks that are CPU bound.
#[inline]
pub const fn blocking_task_pool(&self) -> &BlockingTaskPool {
&self.blocking_task_pool
@@ -576,6 +590,12 @@ where
pub const fn evm_memory_limit(&self) -> u64 {
self.evm_memory_limit
}
/// Returns a reference to the blocking IO request semaphore.
#[inline]
pub const fn blocking_io_request_semaphore(&self) -> &Arc<Semaphore> {
&self.blocking_io_request_semaphore
}
}
#[cfg(test)]

View File

@@ -404,6 +404,13 @@ RPC:
[default: <NUM CPU CORES-2>]
--rpc.max-blocking-io-requests <COUNT>
Maximum number of concurrent blocking IO requests.
Blocking IO requests include `eth_call`, `eth_estimateGas`, and similar methods that require EVM execution. These are spawned as blocking tasks to avoid blocking the async runtime.
[default: 256]
--rpc.max-trace-filter-blocks <COUNT>
Maximum number of blocks for `trace_filter` requests

View File

@@ -404,6 +404,13 @@ RPC:
[default: <NUM CPU CORES-2>]
--rpc.max-blocking-io-requests <COUNT>
Maximum number of concurrent blocking IO requests.
Blocking IO requests include `eth_call`, `eth_estimateGas`, and similar methods that require EVM execution. These are spawned as blocking tasks to avoid blocking the async runtime.
[default: 256]
--rpc.max-trace-filter-blocks <COUNT>
Maximum number of blocks for `trace_filter` requests