add semaphore to EthStateCacheService (#4477)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Alessandro Mazza
2023-09-04 19:19:11 +02:00
committed by GitHub
parent d2647dcc7c
commit 1ec4d462a2
2 changed files with 36 additions and 4 deletions

View File

@@ -21,6 +21,9 @@ pub const DEFAULT_RECEIPT_CACHE_MAX_LEN: u32 = 2000;
/// Default cache size for the env cache: 1000 envs.
pub const DEFAULT_ENV_CACHE_MAX_LEN: u32 = 1000;
/// Default number of concurrent database requests.
pub const DEFAULT_CONCURRENT_DB_REQUESTS: usize = 512;
/// Settings for the [EthStateCache](crate::eth::cache::EthStateCache).
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
@@ -37,6 +40,10 @@ pub struct EthStateCacheConfig {
///
/// Default is 1000.
pub max_envs: u32,
/// Max number of concurrent database requests.
///
/// Default is 512.
pub max_concurrent_db_requests: usize,
}
impl Default for EthStateCacheConfig {
@@ -45,6 +52,7 @@ impl Default for EthStateCacheConfig {
max_blocks: DEFAULT_BLOCK_CACHE_MAX_LEN,
max_receipts: DEFAULT_RECEIPT_CACHE_MAX_LEN,
max_envs: DEFAULT_ENV_CACHE_MAX_LEN,
max_concurrent_db_requests: DEFAULT_CONCURRENT_DB_REQUESTS,
}
}
}

View File

@@ -12,11 +12,12 @@ use schnellru::{ByLength, Limiter};
use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{ready, Context, Poll},
};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedSender},
oneshot,
oneshot, Semaphore,
};
use tokio_stream::wrappers::UnboundedReceiverStream;
@@ -68,6 +69,7 @@ impl EthStateCache {
max_blocks: u32,
max_receipts: u32,
max_envs: u32,
max_concurrent_db_operations: usize,
) -> (Self, EthStateCacheService<Provider, Tasks>) {
let (to_service, rx) = unbounded_channel();
let service = EthStateCacheService {
@@ -78,6 +80,7 @@ impl EthStateCache {
action_tx: to_service.clone(),
action_rx: UnboundedReceiverStream::new(rx),
action_task_spawner,
rate_limiter: Arc::new(Semaphore::new(max_concurrent_db_operations)),
};
let cache = EthStateCache { to_service };
(cache, service)
@@ -107,9 +110,16 @@ impl EthStateCache {
Provider: StateProviderFactory + BlockReader + EvmEnvProvider + Clone + Unpin + 'static,
Tasks: TaskSpawner + Clone + 'static,
{
let EthStateCacheConfig { max_blocks, max_receipts, max_envs } = config;
let (this, service) =
Self::create(provider, executor.clone(), max_blocks, max_receipts, max_envs);
let EthStateCacheConfig { max_blocks, max_receipts, max_envs, max_concurrent_db_requests } =
config;
let (this, service) = Self::create(
provider,
executor.clone(),
max_blocks,
max_receipts,
max_envs,
max_concurrent_db_requests,
);
executor.spawn_critical("eth state cache", Box::pin(service));
this
}
@@ -229,6 +239,8 @@ pub(crate) struct EthStateCacheService<
action_rx: UnboundedReceiverStream<CacheAction>,
/// The type that's used to spawn tasks that do the actual work
action_task_spawner: Tasks,
/// Rate limiter
rate_limiter: Arc<Semaphore>,
}
impl<Provider, Tasks> EthStateCacheService<Provider, Tasks>
@@ -308,7 +320,10 @@ where
if this.full_block_cache.queue(block_hash, Either::Left(response_tx)) {
let provider = this.provider.clone();
let action_tx = this.action_tx.clone();
let rate_limiter = this.rate_limiter.clone();
this.action_task_spawner.spawn_blocking(Box::pin(async move {
// Acquire permit
let _permit = rate_limiter.acquire().await;
// Only look in the database to prevent situations where we
// looking up the tree is blocking
let res = provider
@@ -329,7 +344,10 @@ where
if this.full_block_cache.queue(block_hash, Either::Right(response_tx)) {
let provider = this.provider.clone();
let action_tx = this.action_tx.clone();
let rate_limiter = this.rate_limiter.clone();
this.action_task_spawner.spawn_blocking(Box::pin(async move {
// Acquire permit
let _permit = rate_limiter.acquire().await;
// Only look in the database to prevent situations where we
// looking up the tree is blocking
let res = provider
@@ -350,7 +368,10 @@ where
if this.receipts_cache.queue(block_hash, response_tx) {
let provider = this.provider.clone();
let action_tx = this.action_tx.clone();
let rate_limiter = this.rate_limiter.clone();
this.action_task_spawner.spawn_blocking(Box::pin(async move {
// Acquire permit
let _permit = rate_limiter.acquire().await;
let res = provider.receipts_by_block(block_hash.into());
let _ = action_tx
.send(CacheAction::ReceiptsResult { block_hash, res });
@@ -369,7 +390,10 @@ where
if this.evm_env_cache.queue(block_hash, response_tx) {
let provider = this.provider.clone();
let action_tx = this.action_tx.clone();
let rate_limiter = this.rate_limiter.clone();
this.action_task_spawner.spawn_blocking(Box::pin(async move {
// Acquire permit
let _permit = rate_limiter.acquire().await;
let mut cfg = CfgEnv::default();
let mut block_env = BlockEnv::default();
let res = provider