From 1ec4d462a22574ddf3baa88e507d6fbbf879eb40 Mon Sep 17 00:00:00 2001 From: Alessandro Mazza <121622391+alessandromazza98@users.noreply.github.com> Date: Mon, 4 Sep 2023 19:19:11 +0200 Subject: [PATCH] add semaphore to EthStateCacheService (#4477) Co-authored-by: Matthias Seitz --- crates/rpc/rpc/src/eth/cache/config.rs | 8 +++++++ crates/rpc/rpc/src/eth/cache/mod.rs | 32 ++++++++++++++++++++++---- 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/crates/rpc/rpc/src/eth/cache/config.rs b/crates/rpc/rpc/src/eth/cache/config.rs index da7037e72a..d8a01cfb35 100644 --- a/crates/rpc/rpc/src/eth/cache/config.rs +++ b/crates/rpc/rpc/src/eth/cache/config.rs @@ -21,6 +21,9 @@ pub const DEFAULT_RECEIPT_CACHE_MAX_LEN: u32 = 2000; /// Default cache size for the env cache: 1000 envs. pub const DEFAULT_ENV_CACHE_MAX_LEN: u32 = 1000; +/// Default number of concurrent database requests. +pub const DEFAULT_CONCURRENT_DB_REQUESTS: usize = 512; + /// Settings for the [EthStateCache](crate::eth::cache::EthStateCache). #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] @@ -37,6 +40,10 @@ pub struct EthStateCacheConfig { /// /// Default is 1000. pub max_envs: u32, + /// Max number of concurrent database requests. + /// + /// Default is 512. + pub max_concurrent_db_requests: usize, } impl Default for EthStateCacheConfig { @@ -45,6 +52,7 @@ impl Default for EthStateCacheConfig { max_blocks: DEFAULT_BLOCK_CACHE_MAX_LEN, max_receipts: DEFAULT_RECEIPT_CACHE_MAX_LEN, max_envs: DEFAULT_ENV_CACHE_MAX_LEN, + max_concurrent_db_requests: DEFAULT_CONCURRENT_DB_REQUESTS, } } } diff --git a/crates/rpc/rpc/src/eth/cache/mod.rs b/crates/rpc/rpc/src/eth/cache/mod.rs index 8209a77ec3..61424c1b1b 100644 --- a/crates/rpc/rpc/src/eth/cache/mod.rs +++ b/crates/rpc/rpc/src/eth/cache/mod.rs @@ -12,11 +12,12 @@ use schnellru::{ByLength, Limiter}; use std::{ future::Future, pin::Pin, + sync::Arc, task::{ready, Context, Poll}, }; use tokio::sync::{ mpsc::{unbounded_channel, UnboundedSender}, - oneshot, + oneshot, Semaphore, }; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -68,6 +69,7 @@ impl EthStateCache { max_blocks: u32, max_receipts: u32, max_envs: u32, + max_concurrent_db_operations: usize, ) -> (Self, EthStateCacheService) { let (to_service, rx) = unbounded_channel(); let service = EthStateCacheService { @@ -78,6 +80,7 @@ impl EthStateCache { action_tx: to_service.clone(), action_rx: UnboundedReceiverStream::new(rx), action_task_spawner, + rate_limiter: Arc::new(Semaphore::new(max_concurrent_db_operations)), }; let cache = EthStateCache { to_service }; (cache, service) @@ -107,9 +110,16 @@ impl EthStateCache { Provider: StateProviderFactory + BlockReader + EvmEnvProvider + Clone + Unpin + 'static, Tasks: TaskSpawner + Clone + 'static, { - let EthStateCacheConfig { max_blocks, max_receipts, max_envs } = config; - let (this, service) = - Self::create(provider, executor.clone(), max_blocks, max_receipts, max_envs); + let EthStateCacheConfig { max_blocks, max_receipts, max_envs, max_concurrent_db_requests } = + config; + let (this, service) = Self::create( + provider, + executor.clone(), + max_blocks, + max_receipts, + max_envs, + max_concurrent_db_requests, + ); executor.spawn_critical("eth state cache", Box::pin(service)); this } @@ -229,6 +239,8 @@ pub(crate) struct EthStateCacheService< action_rx: UnboundedReceiverStream, /// The type that's used to spawn tasks that do the actual work action_task_spawner: Tasks, + /// Rate limiter + rate_limiter: Arc, } impl EthStateCacheService @@ -308,7 +320,10 @@ where if this.full_block_cache.queue(block_hash, Either::Left(response_tx)) { let provider = this.provider.clone(); let action_tx = this.action_tx.clone(); + let rate_limiter = this.rate_limiter.clone(); this.action_task_spawner.spawn_blocking(Box::pin(async move { + // Acquire permit + let _permit = rate_limiter.acquire().await; // Only look in the database to prevent situations where we // looking up the tree is blocking let res = provider @@ -329,7 +344,10 @@ where if this.full_block_cache.queue(block_hash, Either::Right(response_tx)) { let provider = this.provider.clone(); let action_tx = this.action_tx.clone(); + let rate_limiter = this.rate_limiter.clone(); this.action_task_spawner.spawn_blocking(Box::pin(async move { + // Acquire permit + let _permit = rate_limiter.acquire().await; // Only look in the database to prevent situations where we // looking up the tree is blocking let res = provider @@ -350,7 +368,10 @@ where if this.receipts_cache.queue(block_hash, response_tx) { let provider = this.provider.clone(); let action_tx = this.action_tx.clone(); + let rate_limiter = this.rate_limiter.clone(); this.action_task_spawner.spawn_blocking(Box::pin(async move { + // Acquire permit + let _permit = rate_limiter.acquire().await; let res = provider.receipts_by_block(block_hash.into()); let _ = action_tx .send(CacheAction::ReceiptsResult { block_hash, res }); @@ -369,7 +390,10 @@ where if this.evm_env_cache.queue(block_hash, response_tx) { let provider = this.provider.clone(); let action_tx = this.action_tx.clone(); + let rate_limiter = this.rate_limiter.clone(); this.action_task_spawner.spawn_blocking(Box::pin(async move { + // Acquire permit + let _permit = rate_limiter.acquire().await; let mut cfg = CfgEnv::default(); let mut block_env = BlockEnv::default(); let res = provider