diff --git a/crates/node/builder/src/rpc.rs b/crates/node/builder/src/rpc.rs index d620474385..e1dfb58d13 100644 --- a/crates/node/builder/src/rpc.rs +++ b/crates/node/builder/src/rpc.rs @@ -1179,6 +1179,7 @@ impl<'a, N: FullNodeComponents &BlockingTaskGuard { self.inner.eth_api.blocking_task_guard() } + + #[inline] + fn blocking_io_task_guard(&self) -> &Arc { + self.inner.eth_api.blocking_io_request_semaphore() + } } impl LoadFee for OpEthApi diff --git a/crates/rpc/rpc-builder/src/config.rs b/crates/rpc/rpc-builder/src/config.rs index 6198b0ee0a..c9bb5af8c2 100644 --- a/crates/rpc/rpc-builder/src/config.rs +++ b/crates/rpc/rpc-builder/src/config.rs @@ -94,6 +94,7 @@ impl RethRpcServerConfig for RpcServerArgs { fn eth_config(&self) -> EthConfig { EthConfig::default() .max_tracing_requests(self.rpc_max_tracing_requests) + .max_blocking_io_requests(self.rpc_max_blocking_io_requests) .max_trace_filter_blocks(self.rpc_max_trace_filter_blocks) .max_blocks_per_filter(self.rpc_max_blocks_per_filter.unwrap_or_max()) .max_logs_per_response(self.rpc_max_logs_per_response.unwrap_or_max() as usize) diff --git a/crates/rpc/rpc-eth-api/src/helpers/blocking_task.rs b/crates/rpc/rpc-eth-api/src/helpers/blocking_task.rs index 886ff63914..c174cd9bde 100644 --- a/crates/rpc/rpc-eth-api/src/helpers/blocking_task.rs +++ b/crates/rpc/rpc-eth-api/src/helpers/blocking_task.rs @@ -7,18 +7,29 @@ use reth_tasks::{ pool::{BlockingTaskGuard, BlockingTaskPool}, TaskSpawner, }; -use tokio::sync::{oneshot, AcquireError, OwnedSemaphorePermit}; +use std::sync::Arc; +use tokio::sync::{oneshot, AcquireError, OwnedSemaphorePermit, Semaphore}; use crate::EthApiTypes; -/// Executes code on a blocking thread. +/// Helpers for spawning blocking operations. +/// +/// Operations can be blocking because they require lots of CPU work and/or IO. +/// +/// This differentiates between workloads that are primarily CPU bound and heavier in general (such +/// as tracing tasks) and tasks that have a more balanced profile (io and cpu), such as `eth_call` +/// and alike. +/// +/// This provides access to semaphores that permit how many of those are permitted concurrently. +/// It's expected that tracing related tasks are configured with a lower threshold, because not only +/// are they CPU heavy but they can also accumulate more memory for the traces. pub trait SpawnBlocking: EthApiTypes + Clone + Send + Sync + 'static { /// Returns a handle for spawning IO heavy blocking tasks. /// /// Runtime access in default trait method implementations. fn io_task_spawner(&self) -> impl TaskSpawner; - /// Returns a handle for spawning CPU heavy blocking tasks. + /// Returns a handle for spawning __CPU heavy__ blocking tasks, such as tracing requests. /// /// Thread pool access in default trait method implementations. fn tracing_task_pool(&self) -> &BlockingTaskPool; @@ -26,21 +37,121 @@ pub trait SpawnBlocking: EthApiTypes + Clone + Send + Sync + 'static { /// Returns handle to semaphore for pool of CPU heavy blocking tasks. fn tracing_task_guard(&self) -> &BlockingTaskGuard; + /// Returns handle to semaphore for blocking IO tasks. + /// + /// This semaphore is used to limit concurrent blocking IO operations like `eth_call`, + /// `eth_estimateGas`, and similar methods that require EVM execution. + fn blocking_io_task_guard(&self) -> &Arc; + + /// Acquires a permit from the tracing task semaphore. + /// + /// This should be used for __CPU heavy__ operations like `debug_traceTransaction`, + /// `debug_traceCall`, and similar tracing methods. These tasks are typically: + /// - Primarily CPU bound with intensive computation + /// - Can accumulate significant memory for trace results + /// - Expected to have lower concurrency limits than general blocking IO tasks + /// + /// For blocking IO tasks like `eth_call` or `eth_estimateGas`, use + /// [`acquire_owned_blocking_io`](Self::acquire_owned_blocking_io) instead. + /// /// See also [`Semaphore::acquire_owned`](`tokio::sync::Semaphore::acquire_owned`). - fn acquire_owned( + fn acquire_owned_tracing( &self, ) -> impl Future> + Send { self.tracing_task_guard().clone().acquire_owned() } + /// Acquires multiple permits from the tracing task semaphore. + /// + /// This should be used for particularly heavy tracing operations that require more resources + /// than a standard trace. The permit count should reflect the expected resource consumption + /// relative to a standard tracing operation. + /// + /// Like [`acquire_owned_tracing`](Self::acquire_owned_tracing), this is specifically for + /// CPU-intensive tracing tasks, not general blocking IO operations. + /// /// See also [`Semaphore::acquire_many_owned`](`tokio::sync::Semaphore::acquire_many_owned`). - fn acquire_many_owned( + fn acquire_many_owned_tracing( &self, n: u32, ) -> impl Future> + Send { self.tracing_task_guard().clone().acquire_many_owned(n) } + /// Acquires a permit from the blocking IO request semaphore. + /// + /// This should be used for operations like `eth_call`, `eth_estimateGas`, and similar methods + /// that require EVM execution and are spawned as blocking tasks. + /// + /// See also [`Semaphore::acquire_owned`](`tokio::sync::Semaphore::acquire_owned`). + fn acquire_owned_blocking_io( + &self, + ) -> impl Future> + Send { + self.blocking_io_task_guard().clone().acquire_owned() + } + + /// Acquires multiple permits from the blocking IO request semaphore. + /// + /// This should be used for operations that may require more resources than a single permit + /// allows. + /// + /// See also [`Semaphore::acquire_many_owned`](`tokio::sync::Semaphore::acquire_many_owned`). + fn acquire_many_owned_blocking_io( + &self, + n: u32, + ) -> impl Future> + Send { + self.blocking_io_task_guard().clone().acquire_many_owned(n) + } + + /// Acquires permits from the blocking IO request semaphore based on a calculated weight. + /// + /// The weight determines the maximum number of concurrent requests of this type that can run. + /// For example, if the semaphore has 256 total permits and `weight=10`, then at most 10 + /// concurrent requests of this type are allowed. + /// + /// The permits acquired per request is calculated as `total_permits / weight`, with an + /// adjustment: if this result is even, we add 1 to ensure that `weight - 1` permits are + /// always available for other tasks, preventing complete semaphore exhaustion. + /// + /// This should be used to explicitly limit concurrent requests based on their expected + /// resource consumption: + /// + /// - **Block range queries**: Higher weight for larger ranges (fewer concurrent requests) + /// - **Complex calls**: Higher weight for expensive operations + /// - **Batch operations**: Higher weight for larger batches + /// - **Historical queries**: Higher weight for deeper history lookups + /// + /// # Examples + /// + /// ```ignore + /// // For a heavy request, use higher weight to limit concurrency + /// let weight = 20; // Allow at most 20 concurrent requests of this type + /// let _permit = self.acquire_weighted_blocking_io(weight).await?; + /// ``` + /// + /// This helps prevent resource exhaustion from concurrent expensive operations while allowing + /// many cheap operations to run in parallel. + /// + /// See also [`Semaphore::acquire_many_owned`](`tokio::sync::Semaphore::acquire_many_owned`). + fn acquire_weighted_blocking_io( + &self, + weight: u32, + ) -> impl Future> + Send { + let guard = self.blocking_io_task_guard(); + let total_permits = guard.available_permits().max(1) as u32; + let weight = weight.max(1); + let mut permits_to_acquire = (total_permits / weight).max(1); + + // If total_permits divides evenly by weight, add 1 to ensure that when `weight` + // concurrent requests are running, at least `weight - 1` permits remain available + // for other tasks + if total_permits.is_multiple_of(weight) { + permits_to_acquire += 1; + } + + guard.clone().acquire_many_owned(permits_to_acquire) + } + /// Executes the future on a new blocking task. /// /// Note: This is expected for futures that are dominated by blocking IO operations, for tracing diff --git a/crates/rpc/rpc-eth-api/src/helpers/call.rs b/crates/rpc/rpc-eth-api/src/helpers/call.rs index 78225dd9b7..ea0fdce5ed 100644 --- a/crates/rpc/rpc-eth-api/src/helpers/call.rs +++ b/crates/rpc/rpc-eth-api/src/helpers/call.rs @@ -212,6 +212,7 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA overrides: EvmOverrides, ) -> impl Future> + Send { async move { + let _permit = self.acquire_owned_blocking_io().await; let res = self.transact_call_at(request, block_number.unwrap_or_default(), overrides).await?; diff --git a/crates/rpc/rpc-eth-api/src/helpers/state.rs b/crates/rpc/rpc-eth-api/src/helpers/state.rs index 4125401109..9f55bae972 100644 --- a/crates/rpc/rpc-eth-api/src/helpers/state.rs +++ b/crates/rpc/rpc-eth-api/src/helpers/state.rs @@ -97,7 +97,7 @@ pub trait EthState: LoadState + SpawnBlocking { { Ok(async move { let _permit = self - .acquire_owned() + .acquire_owned_tracing() .await .map_err(RethError::other) .map_err(EthApiError::Internal)?; diff --git a/crates/rpc/rpc-eth-types/src/builder/config.rs b/crates/rpc/rpc-eth-types/src/builder/config.rs index d537302b02..7b09a3144a 100644 --- a/crates/rpc/rpc-eth-types/src/builder/config.rs +++ b/crates/rpc/rpc-eth-types/src/builder/config.rs @@ -8,9 +8,10 @@ use crate::{ }; use reqwest::Url; use reth_rpc_server_types::constants::{ - default_max_tracing_requests, DEFAULT_ETH_PROOF_WINDOW, DEFAULT_MAX_BLOCKS_PER_FILTER, - DEFAULT_MAX_LOGS_PER_RESPONSE, DEFAULT_MAX_SIMULATE_BLOCKS, DEFAULT_MAX_TRACE_FILTER_BLOCKS, - DEFAULT_PROOF_PERMITS, RPC_DEFAULT_SEND_RAW_TX_SYNC_TIMEOUT_SECS, + default_max_tracing_requests, DEFAULT_ETH_PROOF_WINDOW, DEFAULT_MAX_BLOCKING_IO_REQUEST, + DEFAULT_MAX_BLOCKS_PER_FILTER, DEFAULT_MAX_LOGS_PER_RESPONSE, DEFAULT_MAX_SIMULATE_BLOCKS, + DEFAULT_MAX_TRACE_FILTER_BLOCKS, DEFAULT_PROOF_PERMITS, + RPC_DEFAULT_SEND_RAW_TX_SYNC_TIMEOUT_SECS, }; use serde::{Deserialize, Serialize}; @@ -68,6 +69,15 @@ pub struct EthConfig { pub eth_proof_window: u64, /// The maximum number of tracing calls that can be executed in concurrently. pub max_tracing_requests: usize, + /// The maximum number of blocking IO calls that can be executed in concurrently. + /// + /// Requests such as `eth_call`, `eth_estimateGas` and alike require evm execution, which is + /// considered blocking since it's usually more heavy on the IO side but also CPU constrained. + /// It is expected that these are spawned as short lived blocking tokio tasks. This config + /// determines how many can be spawned concurrently, to avoid a build up in the tokio's + /// blocking pool queue since there's only a limited number of threads available. This setting + /// restricts how many tasks are spawned concurrently. + pub max_blocking_io_requests: usize, /// Maximum number of blocks for `trace_filter` requests. pub max_trace_filter_blocks: u64, /// Maximum number of blocks that could be scanned per filter request in `eth_getLogs` calls. @@ -116,6 +126,7 @@ impl Default for EthConfig { gas_oracle: GasPriceOracleConfig::default(), eth_proof_window: DEFAULT_ETH_PROOF_WINDOW, max_tracing_requests: default_max_tracing_requests(), + max_blocking_io_requests: DEFAULT_MAX_BLOCKING_IO_REQUEST, max_trace_filter_blocks: DEFAULT_MAX_TRACE_FILTER_BLOCKS, max_blocks_per_filter: DEFAULT_MAX_BLOCKS_PER_FILTER, max_logs_per_response: DEFAULT_MAX_LOGS_PER_RESPONSE, @@ -152,6 +163,12 @@ impl EthConfig { self } + /// Configures the maximum number of blocking IO requests + pub const fn max_blocking_io_requests(mut self, max_requests: usize) -> Self { + self.max_blocking_io_requests = max_requests; + self + } + /// Configures the maximum block length to scan per `eth_getLogs` request pub const fn max_blocks_per_filter(mut self, max_blocks: u64) -> Self { self.max_blocks_per_filter = max_blocks; diff --git a/crates/rpc/rpc-server-types/src/constants.rs b/crates/rpc/rpc-server-types/src/constants.rs index 8861af7b54..acf5294fe9 100644 --- a/crates/rpc/rpc-server-types/src/constants.rs +++ b/crates/rpc/rpc-server-types/src/constants.rs @@ -18,6 +18,20 @@ pub const DEFAULT_MAX_LOGS_PER_RESPONSE: usize = 20_000; /// The default maximum number of blocks for `trace_filter` requests. pub const DEFAULT_MAX_TRACE_FILTER_BLOCKS: u64 = 100; +/// Setting for how many concurrent (heavier) _blocking_ IO requests are allowed. +/// +/// What is considered a blocking IO request can depend on the RPC method. In general anything that +/// requires IO is considered blocking and should be spawned as blocking. This setting is however, +/// primarily intended for heavier blocking requests that require evm execution for example, +/// `eth_call` and alike. This is intended to be used with a semaphore that must be acquired before +/// a new task is spawned to avoid unnecessary pooling if the number of inflight requests exceeds +/// the available threads in the pool. +/// +/// tokio's blocking pool, has a default of 512 and could grow unbounded, since requests like +/// `eth_call` also require a lot of cpu which will occupy the thread, we can set this to a lower +/// value. +pub const DEFAULT_MAX_BLOCKING_IO_REQUEST: usize = 256; + /// The default maximum number tracing requests we're allowing concurrently. /// Tracing is mostly CPU bound so we're limiting the number of concurrent requests to something /// lower that the number of cores, in order to minimize the impact on the rest of the system. diff --git a/crates/rpc/rpc/src/eth/builder.rs b/crates/rpc/rpc/src/eth/builder.rs index ff01903736..9642ca97be 100644 --- a/crates/rpc/rpc/src/eth/builder.rs +++ b/crates/rpc/rpc/src/eth/builder.rs @@ -15,7 +15,8 @@ use reth_rpc_eth_types::{ FeeHistoryCacheConfig, ForwardConfig, GasCap, GasPriceOracle, GasPriceOracleConfig, }; use reth_rpc_server_types::constants::{ - DEFAULT_ETH_PROOF_WINDOW, DEFAULT_MAX_SIMULATE_BLOCKS, DEFAULT_PROOF_PERMITS, + DEFAULT_ETH_PROOF_WINDOW, DEFAULT_MAX_BLOCKING_IO_REQUEST, DEFAULT_MAX_SIMULATE_BLOCKS, + DEFAULT_PROOF_PERMITS, }; use reth_tasks::{pool::BlockingTaskPool, TaskSpawner, TokioTaskExecutor}; use std::{sync::Arc, time::Duration}; @@ -41,6 +42,7 @@ pub struct EthApiBuilder { task_spawner: Box, next_env: NextEnv, max_batch_size: usize, + max_blocking_io_requests: usize, pending_block_kind: PendingBlockKind, raw_tx_forwarder: ForwardConfig, send_raw_transaction_sync_timeout: Duration, @@ -92,6 +94,7 @@ impl EthApiBuilder { task_spawner, next_env, max_batch_size, + max_blocking_io_requests, pending_block_kind, raw_tx_forwarder, send_raw_transaction_sync_timeout, @@ -113,6 +116,7 @@ impl EthApiBuilder { task_spawner, next_env, max_batch_size, + max_blocking_io_requests, pending_block_kind, raw_tx_forwarder, send_raw_transaction_sync_timeout, @@ -145,6 +149,7 @@ where eth_state_cache_config: Default::default(), next_env: Default::default(), max_batch_size: 1, + max_blocking_io_requests: DEFAULT_MAX_BLOCKING_IO_REQUEST, pending_block_kind: PendingBlockKind::Full, raw_tx_forwarder: ForwardConfig::default(), send_raw_transaction_sync_timeout: Duration::from_secs(30), @@ -184,6 +189,7 @@ where gas_oracle_config, next_env, max_batch_size, + max_blocking_io_requests, pending_block_kind, raw_tx_forwarder, send_raw_transaction_sync_timeout, @@ -205,6 +211,7 @@ where gas_oracle_config, next_env, max_batch_size, + max_blocking_io_requests, pending_block_kind, raw_tx_forwarder, send_raw_transaction_sync_timeout, @@ -233,6 +240,7 @@ where gas_oracle_config, next_env: _, max_batch_size, + max_blocking_io_requests, pending_block_kind, raw_tx_forwarder, send_raw_transaction_sync_timeout, @@ -254,6 +262,7 @@ where gas_oracle_config, next_env, max_batch_size, + max_blocking_io_requests, pending_block_kind, raw_tx_forwarder, send_raw_transaction_sync_timeout, @@ -335,6 +344,12 @@ where self } + /// Sets the maximum number of concurrent blocking IO requests. + pub const fn max_blocking_io_requests(mut self, max_blocking_io_requests: usize) -> Self { + self.max_blocking_io_requests = max_blocking_io_requests; + self + } + /// Sets the pending block kind pub const fn pending_block_kind(mut self, pending_block_kind: PendingBlockKind) -> Self { self.pending_block_kind = pending_block_kind; @@ -482,6 +497,7 @@ where task_spawner, next_env, max_batch_size, + max_blocking_io_requests, pending_block_kind, raw_tx_forwarder, send_raw_transaction_sync_timeout, @@ -523,6 +539,7 @@ where rpc_converter, next_env, max_batch_size, + max_blocking_io_requests, pending_block_kind, raw_tx_forwarder.forwarder_client(), send_raw_transaction_sync_timeout, diff --git a/crates/rpc/rpc/src/eth/core.rs b/crates/rpc/rpc/src/eth/core.rs index 5b16efd6b4..a1021ab7ce 100644 --- a/crates/rpc/rpc/src/eth/core.rs +++ b/crates/rpc/rpc/src/eth/core.rs @@ -33,7 +33,7 @@ use reth_transaction_pool::{ blobstore::BlobSidecarConverter, noop::NoopTransactionPool, AddedTransactionOutcome, BatchTxProcessor, BatchTxRequest, TransactionPool, }; -use tokio::sync::{broadcast, mpsc, Mutex}; +use tokio::sync::{broadcast, mpsc, Mutex, Semaphore}; const DEFAULT_BROADCAST_CAPACITY: usize = 2000; @@ -152,6 +152,7 @@ where proof_permits: usize, rpc_converter: Rpc, max_batch_size: usize, + max_blocking_io_requests: usize, pending_block_kind: PendingBlockKind, raw_tx_forwarder: ForwardConfig, send_raw_transaction_sync_timeout: Duration, @@ -171,6 +172,7 @@ where rpc_converter, (), max_batch_size, + max_blocking_io_requests, pending_block_kind, raw_tx_forwarder.forwarder_client(), send_raw_transaction_sync_timeout, @@ -263,6 +265,11 @@ where fn tracing_task_guard(&self) -> &BlockingTaskGuard { self.inner.blocking_task_guard() } + + #[inline] + fn blocking_io_task_guard(&self) -> &std::sync::Arc { + self.inner.blocking_io_request_semaphore() + } } /// Container type `EthApi` @@ -296,6 +303,9 @@ pub struct EthApiInner { /// Guard for getproof calls blocking_task_guard: BlockingTaskGuard, + /// Semaphore to limit concurrent blocking IO requests (`eth_call`, `eth_estimateGas`, etc.) + blocking_io_request_semaphore: Arc, + /// Transaction broadcast channel raw_tx_sender: broadcast::Sender, @@ -346,6 +356,7 @@ where converter: Rpc, next_env: impl PendingEnvBuilder, max_batch_size: usize, + max_blocking_io_requests: usize, pending_block_kind: PendingBlockKind, raw_tx_forwarder: Option, send_raw_transaction_sync_timeout: Duration, @@ -384,6 +395,7 @@ where blocking_task_pool, fee_history_cache, blocking_task_guard: BlockingTaskGuard::new(proof_permits), + blocking_io_request_semaphore: Arc::new(Semaphore::new(max_blocking_io_requests)), raw_tx_sender, raw_tx_forwarder, converter, @@ -440,6 +452,8 @@ where } /// Returns a handle to the blocking thread pool. + /// + /// This is intended for tasks that are CPU bound. #[inline] pub const fn blocking_task_pool(&self) -> &BlockingTaskPool { &self.blocking_task_pool @@ -576,6 +590,12 @@ where pub const fn evm_memory_limit(&self) -> u64 { self.evm_memory_limit } + + /// Returns a reference to the blocking IO request semaphore. + #[inline] + pub const fn blocking_io_request_semaphore(&self) -> &Arc { + &self.blocking_io_request_semaphore + } } #[cfg(test)] diff --git a/docs/vocs/docs/pages/cli/op-reth/node.mdx b/docs/vocs/docs/pages/cli/op-reth/node.mdx index 2883cae22c..fc09b7a8cd 100644 --- a/docs/vocs/docs/pages/cli/op-reth/node.mdx +++ b/docs/vocs/docs/pages/cli/op-reth/node.mdx @@ -404,6 +404,13 @@ RPC: [default: ] + --rpc.max-blocking-io-requests + Maximum number of concurrent blocking IO requests. + + Blocking IO requests include `eth_call`, `eth_estimateGas`, and similar methods that require EVM execution. These are spawned as blocking tasks to avoid blocking the async runtime. + + [default: 256] + --rpc.max-trace-filter-blocks Maximum number of blocks for `trace_filter` requests diff --git a/docs/vocs/docs/pages/cli/reth/node.mdx b/docs/vocs/docs/pages/cli/reth/node.mdx index 076fd06eee..8746a770e1 100644 --- a/docs/vocs/docs/pages/cli/reth/node.mdx +++ b/docs/vocs/docs/pages/cli/reth/node.mdx @@ -404,6 +404,13 @@ RPC: [default: ] + --rpc.max-blocking-io-requests + Maximum number of concurrent blocking IO requests. + + Blocking IO requests include `eth_call`, `eth_estimateGas`, and similar methods that require EVM execution. These are spawned as blocking tasks to avoid blocking the async runtime. + + [default: 256] + --rpc.max-trace-filter-blocks Maximum number of blocks for `trace_filter` requests