From 8be06843e468a5389e20082ffcfc09e8edafa208 Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Fri, 30 Jun 2023 21:01:50 +0300 Subject: [PATCH] fix(rpc): limit cache by length (#3505) --- bin/reth/src/args/rpc_server_args.rs | 36 ++++---------- crates/rpc/rpc/src/eth/cache.rs | 71 +++++++++++++++------------- 2 files changed, 47 insertions(+), 60 deletions(-) diff --git a/bin/reth/src/args/rpc_server_args.rs b/bin/reth/src/args/rpc_server_args.rs index c08aebe0e5..bcf10cd037 100644 --- a/bin/reth/src/args/rpc_server_args.rs +++ b/bin/reth/src/args/rpc_server_args.rs @@ -13,8 +13,7 @@ use reth_provider::{ use reth_rpc::{ eth::{ cache::{ - DEFAULT_BLOCK_CACHE_SIZE_BYTES_MB, DEFAULT_ENV_CACHE_SIZE_BYTES_MB, - DEFAULT_RECEIPT_CACHE_SIZE_BYTES_MB, + DEFAULT_BLOCK_CACHE_MAX_LEN, DEFAULT_ENV_CACHE_MAX_LEN, DEFAULT_RECEIPT_CACHE_MAX_LEN, }, gas_oracle::GasPriceOracleConfig, }, @@ -136,17 +135,17 @@ pub struct RpcServerArgs { #[clap(flatten)] pub gas_price_oracle: GasPriceOracleArgs, - /// Max size for cached block data in megabytes. - #[arg(long, default_value_t = DEFAULT_BLOCK_CACHE_SIZE_BYTES_MB)] - pub block_cache_size: usize, + /// Maximum number of block cache entries. + #[arg(long, default_value_t = DEFAULT_BLOCK_CACHE_MAX_LEN)] + pub block_cache_len: u32, - /// Max size for cached receipt data in megabytes. - #[arg(long, default_value_t = DEFAULT_RECEIPT_CACHE_SIZE_BYTES_MB)] - pub receipt_cache_size: usize, + /// Maximum number of receipt cache entries. + #[arg(long, default_value_t = DEFAULT_RECEIPT_CACHE_MAX_LEN)] + pub receipt_cache_len: u32, - /// Max size for cached evm env data in megabytes. - #[arg(long, default_value_t = DEFAULT_ENV_CACHE_SIZE_BYTES_MB)] - pub env_cache_size: usize, + /// Maximum number of env cache entries. + #[arg(long, default_value_t = DEFAULT_ENV_CACHE_MAX_LEN)] + pub env_cache_len: u32, } impl RpcServerArgs { @@ -160,21 +159,6 @@ impl RpcServerArgs { self.rpc_max_response_size * 1024 * 1024 } - /// Returns the max number of bytes for cached block data in bytes - pub fn block_cache_size_bytes(&self) -> usize { - self.block_cache_size * 1024 * 1024 - } - - /// Returns the max number of bytes for cached receipt data in bytes - pub fn receipt_cache_size_bytes(&self) -> usize { - self.receipt_cache_size * 1024 * 1024 - } - - /// Returns the max number of bytes for cached evm env data in bytes - pub fn env_cache_size_bytes(&self) -> usize { - self.env_cache_size * 1024 * 1024 - } - /// Extracts the gas price oracle config from the args. pub fn gas_price_oracle_config(&self) -> GasPriceOracleConfig { GasPriceOracleConfig::new( diff --git a/crates/rpc/rpc/src/eth/cache.rs b/crates/rpc/rpc/src/eth/cache.rs index ffa0e4ec44..b23a677cf5 100644 --- a/crates/rpc/rpc/src/eth/cache.rs +++ b/crates/rpc/rpc/src/eth/cache.rs @@ -10,7 +10,7 @@ use reth_primitives::{Block, Receipt, SealedBlock, TransactionSigned, H256}; use reth_provider::{BlockReader, CanonStateNotification, EvmEnvProvider, StateProviderFactory}; use reth_tasks::{TaskSpawner, TokioTaskExecutor}; use revm::primitives::{BlockEnv, CfgEnv}; -use schnellru::{ByMemoryUsage, Limiter, LruMap}; +use schnellru::{ByLength, Limiter, LruMap}; use serde::{Deserialize, Serialize}; use std::{ collections::{hash_map::Entry, HashMap}, @@ -25,6 +25,7 @@ use tokio::sync::{ }; use tokio_stream::wrappers::UnboundedReceiverStream; +// TODO: memory based limiter is currently disabled pending /// Default cache size for the block cache: 500MB /// /// With an average block size of ~100kb this should be able to cache ~5000 blocks. @@ -36,6 +37,15 @@ pub const DEFAULT_RECEIPT_CACHE_SIZE_BYTES_MB: usize = 500; /// Default cache size for the env cache: 1MB pub const DEFAULT_ENV_CACHE_SIZE_BYTES_MB: usize = 1; +/// Default cache size for the block cache: 5000 blocks. +pub const DEFAULT_BLOCK_CACHE_MAX_LEN: u32 = 5000; + +/// Default cache size for the receipts cache: 2000 receipts. +pub const DEFAULT_RECEIPT_CACHE_MAX_LEN: u32 = 2000; + +/// Default cache size for the env cache: 1000 envs. +pub const DEFAULT_ENV_CACHE_MAX_LEN: u32 = 1000; + /// The type that can send the response to a requested [Block] type BlockResponseSender = oneshot::Sender>>; @@ -63,26 +73,26 @@ type EnvLruCache = MultiConsumerLruCache Self { Self { - max_block_bytes: DEFAULT_BLOCK_CACHE_SIZE_BYTES_MB * 1024 * 1024, - max_receipt_bytes: DEFAULT_RECEIPT_CACHE_SIZE_BYTES_MB * 1024 * 1024, - max_env_bytes: DEFAULT_ENV_CACHE_SIZE_BYTES_MB * 1024 * 1024, + max_blocks: DEFAULT_BLOCK_CACHE_MAX_LEN, + max_receipts: DEFAULT_RECEIPT_CACHE_MAX_LEN, + max_envs: DEFAULT_ENV_CACHE_MAX_LEN, } } } @@ -101,16 +111,16 @@ impl EthStateCache { fn create( provider: Provider, action_task_spawner: Tasks, - max_block_bytes: usize, - max_receipt_bytes: usize, - max_env_bytes: usize, + max_blocks: u32, + max_receipts: u32, + max_envs: u32, ) -> (Self, EthStateCacheService) { let (to_service, rx) = unbounded_channel(); let service = EthStateCacheService { provider, - full_block_cache: BlockLruCache::new(max_block_bytes, "blocks"), - receipts_cache: ReceiptsLruCache::new(max_receipt_bytes, "receipts"), - evm_env_cache: EnvLruCache::new(max_env_bytes, "evm_env"), + full_block_cache: BlockLruCache::new(max_blocks, "blocks"), + receipts_cache: ReceiptsLruCache::new(max_receipts, "receipts"), + evm_env_cache: EnvLruCache::new(max_envs, "evm_env"), action_tx: to_service.clone(), action_rx: UnboundedReceiverStream::new(rx), action_task_spawner, @@ -143,14 +153,9 @@ impl EthStateCache { Provider: StateProviderFactory + BlockReader + EvmEnvProvider + Clone + Unpin + 'static, Tasks: TaskSpawner + Clone + 'static, { - let EthStateCacheConfig { max_block_bytes, max_receipt_bytes, max_env_bytes } = config; - let (this, service) = Self::create( - provider, - executor.clone(), - max_block_bytes, - max_receipt_bytes, - max_env_bytes, - ); + let EthStateCacheConfig { max_blocks, max_receipts, max_envs } = config; + let (this, service) = + Self::create(provider, executor.clone(), max_blocks, max_receipts, max_envs); executor.spawn_critical("eth state cache", Box::pin(service)); this } @@ -248,9 +253,9 @@ impl EthStateCache { pub(crate) struct EthStateCacheService< Provider, Tasks, - LimitBlocks = ByMemoryUsage, - LimitReceipts = ByMemoryUsage, - LimitEnvs = ByMemoryUsage, + LimitBlocks = ByLength, + LimitReceipts = ByLength, + LimitEnvs = ByLength, > where LimitBlocks: Limiter, LimitReceipts: Limiter>, @@ -513,16 +518,14 @@ where } } -impl MultiConsumerLruCache +impl MultiConsumerLruCache where K: Hash + Eq, { /// Creates a new empty map with a given `memory_budget` and metric label. - /// - /// See also [LruMap::with_memory_budget] - fn new(memory_budget: usize, cache_id: &str) -> Self { + fn new(max_len: u32, cache_id: &str) -> Self { Self { - cache: LruMap::with_memory_budget(memory_budget), + cache: LruMap::new(ByLength::new(max_len)), queued: Default::default(), metrics: CacheMetrics::new_with_labels(&[("cache", cache_id.to_string())]), }