mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-30 01:28:21 -05:00
fix(rpc): limit cache by length (#3505)
This commit is contained in:
@@ -13,8 +13,7 @@ use reth_provider::{
|
||||
use reth_rpc::{
|
||||
eth::{
|
||||
cache::{
|
||||
DEFAULT_BLOCK_CACHE_SIZE_BYTES_MB, DEFAULT_ENV_CACHE_SIZE_BYTES_MB,
|
||||
DEFAULT_RECEIPT_CACHE_SIZE_BYTES_MB,
|
||||
DEFAULT_BLOCK_CACHE_MAX_LEN, DEFAULT_ENV_CACHE_MAX_LEN, DEFAULT_RECEIPT_CACHE_MAX_LEN,
|
||||
},
|
||||
gas_oracle::GasPriceOracleConfig,
|
||||
},
|
||||
@@ -136,17 +135,17 @@ pub struct RpcServerArgs {
|
||||
#[clap(flatten)]
|
||||
pub gas_price_oracle: GasPriceOracleArgs,
|
||||
|
||||
/// Max size for cached block data in megabytes.
|
||||
#[arg(long, default_value_t = DEFAULT_BLOCK_CACHE_SIZE_BYTES_MB)]
|
||||
pub block_cache_size: usize,
|
||||
/// Maximum number of block cache entries.
|
||||
#[arg(long, default_value_t = DEFAULT_BLOCK_CACHE_MAX_LEN)]
|
||||
pub block_cache_len: u32,
|
||||
|
||||
/// Max size for cached receipt data in megabytes.
|
||||
#[arg(long, default_value_t = DEFAULT_RECEIPT_CACHE_SIZE_BYTES_MB)]
|
||||
pub receipt_cache_size: usize,
|
||||
/// Maximum number of receipt cache entries.
|
||||
#[arg(long, default_value_t = DEFAULT_RECEIPT_CACHE_MAX_LEN)]
|
||||
pub receipt_cache_len: u32,
|
||||
|
||||
/// Max size for cached evm env data in megabytes.
|
||||
#[arg(long, default_value_t = DEFAULT_ENV_CACHE_SIZE_BYTES_MB)]
|
||||
pub env_cache_size: usize,
|
||||
/// Maximum number of env cache entries.
|
||||
#[arg(long, default_value_t = DEFAULT_ENV_CACHE_MAX_LEN)]
|
||||
pub env_cache_len: u32,
|
||||
}
|
||||
|
||||
impl RpcServerArgs {
|
||||
@@ -160,21 +159,6 @@ impl RpcServerArgs {
|
||||
self.rpc_max_response_size * 1024 * 1024
|
||||
}
|
||||
|
||||
/// Returns the max number of bytes for cached block data in bytes
|
||||
pub fn block_cache_size_bytes(&self) -> usize {
|
||||
self.block_cache_size * 1024 * 1024
|
||||
}
|
||||
|
||||
/// Returns the max number of bytes for cached receipt data in bytes
|
||||
pub fn receipt_cache_size_bytes(&self) -> usize {
|
||||
self.receipt_cache_size * 1024 * 1024
|
||||
}
|
||||
|
||||
/// Returns the max number of bytes for cached evm env data in bytes
|
||||
pub fn env_cache_size_bytes(&self) -> usize {
|
||||
self.env_cache_size * 1024 * 1024
|
||||
}
|
||||
|
||||
/// Extracts the gas price oracle config from the args.
|
||||
pub fn gas_price_oracle_config(&self) -> GasPriceOracleConfig {
|
||||
GasPriceOracleConfig::new(
|
||||
|
||||
@@ -10,7 +10,7 @@ use reth_primitives::{Block, Receipt, SealedBlock, TransactionSigned, H256};
|
||||
use reth_provider::{BlockReader, CanonStateNotification, EvmEnvProvider, StateProviderFactory};
|
||||
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
|
||||
use revm::primitives::{BlockEnv, CfgEnv};
|
||||
use schnellru::{ByMemoryUsage, Limiter, LruMap};
|
||||
use schnellru::{ByLength, Limiter, LruMap};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{
|
||||
collections::{hash_map::Entry, HashMap},
|
||||
@@ -25,6 +25,7 @@ use tokio::sync::{
|
||||
};
|
||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||
|
||||
// TODO: memory based limiter is currently disabled pending <https://github.com/paradigmxyz/reth/issues/3503>
|
||||
/// Default cache size for the block cache: 500MB
|
||||
///
|
||||
/// With an average block size of ~100kb this should be able to cache ~5000 blocks.
|
||||
@@ -36,6 +37,15 @@ pub const DEFAULT_RECEIPT_CACHE_SIZE_BYTES_MB: usize = 500;
|
||||
/// Default cache size for the env cache: 1MB
|
||||
pub const DEFAULT_ENV_CACHE_SIZE_BYTES_MB: usize = 1;
|
||||
|
||||
/// Default cache size for the block cache: 5000 blocks.
|
||||
pub const DEFAULT_BLOCK_CACHE_MAX_LEN: u32 = 5000;
|
||||
|
||||
/// Default cache size for the receipts cache: 2000 receipts.
|
||||
pub const DEFAULT_RECEIPT_CACHE_MAX_LEN: u32 = 2000;
|
||||
|
||||
/// Default cache size for the env cache: 1000 envs.
|
||||
pub const DEFAULT_ENV_CACHE_MAX_LEN: u32 = 1000;
|
||||
|
||||
/// The type that can send the response to a requested [Block]
|
||||
type BlockResponseSender = oneshot::Sender<Result<Option<Block>>>;
|
||||
|
||||
@@ -63,26 +73,26 @@ type EnvLruCache<L> = MultiConsumerLruCache<H256, (CfgEnv, BlockEnv), L, EnvResp
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct EthStateCacheConfig {
|
||||
/// Max number of bytes for cached block data.
|
||||
/// Max number of blocks in cache.
|
||||
///
|
||||
/// Default is 500MB
|
||||
pub max_block_bytes: usize,
|
||||
/// Max number of bytes for cached receipt data.
|
||||
/// Default is 5000.
|
||||
pub max_blocks: u32,
|
||||
/// Max number receipts in cache.
|
||||
///
|
||||
/// Default is 500MB
|
||||
pub max_receipt_bytes: usize,
|
||||
/// Default is 2000.
|
||||
pub max_receipts: u32,
|
||||
/// Max number of bytes for cached env data.
|
||||
///
|
||||
/// Default is 1MB (env configs are very small)
|
||||
pub max_env_bytes: usize,
|
||||
/// Default is 1000.
|
||||
pub max_envs: u32,
|
||||
}
|
||||
|
||||
impl Default for EthStateCacheConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
max_block_bytes: DEFAULT_BLOCK_CACHE_SIZE_BYTES_MB * 1024 * 1024,
|
||||
max_receipt_bytes: DEFAULT_RECEIPT_CACHE_SIZE_BYTES_MB * 1024 * 1024,
|
||||
max_env_bytes: DEFAULT_ENV_CACHE_SIZE_BYTES_MB * 1024 * 1024,
|
||||
max_blocks: DEFAULT_BLOCK_CACHE_MAX_LEN,
|
||||
max_receipts: DEFAULT_RECEIPT_CACHE_MAX_LEN,
|
||||
max_envs: DEFAULT_ENV_CACHE_MAX_LEN,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -101,16 +111,16 @@ impl EthStateCache {
|
||||
fn create<Provider, Tasks>(
|
||||
provider: Provider,
|
||||
action_task_spawner: Tasks,
|
||||
max_block_bytes: usize,
|
||||
max_receipt_bytes: usize,
|
||||
max_env_bytes: usize,
|
||||
max_blocks: u32,
|
||||
max_receipts: u32,
|
||||
max_envs: u32,
|
||||
) -> (Self, EthStateCacheService<Provider, Tasks>) {
|
||||
let (to_service, rx) = unbounded_channel();
|
||||
let service = EthStateCacheService {
|
||||
provider,
|
||||
full_block_cache: BlockLruCache::new(max_block_bytes, "blocks"),
|
||||
receipts_cache: ReceiptsLruCache::new(max_receipt_bytes, "receipts"),
|
||||
evm_env_cache: EnvLruCache::new(max_env_bytes, "evm_env"),
|
||||
full_block_cache: BlockLruCache::new(max_blocks, "blocks"),
|
||||
receipts_cache: ReceiptsLruCache::new(max_receipts, "receipts"),
|
||||
evm_env_cache: EnvLruCache::new(max_envs, "evm_env"),
|
||||
action_tx: to_service.clone(),
|
||||
action_rx: UnboundedReceiverStream::new(rx),
|
||||
action_task_spawner,
|
||||
@@ -143,14 +153,9 @@ impl EthStateCache {
|
||||
Provider: StateProviderFactory + BlockReader + EvmEnvProvider + Clone + Unpin + 'static,
|
||||
Tasks: TaskSpawner + Clone + 'static,
|
||||
{
|
||||
let EthStateCacheConfig { max_block_bytes, max_receipt_bytes, max_env_bytes } = config;
|
||||
let (this, service) = Self::create(
|
||||
provider,
|
||||
executor.clone(),
|
||||
max_block_bytes,
|
||||
max_receipt_bytes,
|
||||
max_env_bytes,
|
||||
);
|
||||
let EthStateCacheConfig { max_blocks, max_receipts, max_envs } = config;
|
||||
let (this, service) =
|
||||
Self::create(provider, executor.clone(), max_blocks, max_receipts, max_envs);
|
||||
executor.spawn_critical("eth state cache", Box::pin(service));
|
||||
this
|
||||
}
|
||||
@@ -248,9 +253,9 @@ impl EthStateCache {
|
||||
pub(crate) struct EthStateCacheService<
|
||||
Provider,
|
||||
Tasks,
|
||||
LimitBlocks = ByMemoryUsage,
|
||||
LimitReceipts = ByMemoryUsage,
|
||||
LimitEnvs = ByMemoryUsage,
|
||||
LimitBlocks = ByLength,
|
||||
LimitReceipts = ByLength,
|
||||
LimitEnvs = ByLength,
|
||||
> where
|
||||
LimitBlocks: Limiter<H256, Block>,
|
||||
LimitReceipts: Limiter<H256, Vec<Receipt>>,
|
||||
@@ -513,16 +518,14 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<K, V, S> MultiConsumerLruCache<K, V, ByMemoryUsage, S>
|
||||
impl<K, V, S> MultiConsumerLruCache<K, V, ByLength, S>
|
||||
where
|
||||
K: Hash + Eq,
|
||||
{
|
||||
/// Creates a new empty map with a given `memory_budget` and metric label.
|
||||
///
|
||||
/// See also [LruMap::with_memory_budget]
|
||||
fn new(memory_budget: usize, cache_id: &str) -> Self {
|
||||
fn new(max_len: u32, cache_id: &str) -> Self {
|
||||
Self {
|
||||
cache: LruMap::with_memory_budget(memory_budget),
|
||||
cache: LruMap::new(ByLength::new(max_len)),
|
||||
queued: Default::default(),
|
||||
metrics: CacheMetrics::new_with_labels(&[("cache", cache_id.to_string())]),
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user