diff --git a/crates/rpc/rpc/src/eth/cache/config.rs b/crates/rpc/rpc/src/eth/cache/config.rs new file mode 100644 index 0000000000..da7037e72a --- /dev/null +++ b/crates/rpc/rpc/src/eth/cache/config.rs @@ -0,0 +1,50 @@ +use serde::{Deserialize, Serialize}; + +// TODO: memory based limiter is currently disabled pending +/// Default cache size for the block cache: 500MB +/// +/// With an average block size of ~100kb this should be able to cache ~5000 blocks. +pub const DEFAULT_BLOCK_CACHE_SIZE_BYTES_MB: usize = 500; + +/// Default cache size for the receipts cache: 500MB +pub const DEFAULT_RECEIPT_CACHE_SIZE_BYTES_MB: usize = 500; + +/// Default cache size for the env cache: 1MB +pub const DEFAULT_ENV_CACHE_SIZE_BYTES_MB: usize = 1; + +/// Default cache size for the block cache: 5000 blocks. +pub const DEFAULT_BLOCK_CACHE_MAX_LEN: u32 = 5000; + +/// Default cache size for the receipts cache: 2000 receipts. +pub const DEFAULT_RECEIPT_CACHE_MAX_LEN: u32 = 2000; + +/// Default cache size for the env cache: 1000 envs. +pub const DEFAULT_ENV_CACHE_MAX_LEN: u32 = 1000; + +/// Settings for the [EthStateCache](crate::eth::cache::EthStateCache). +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct EthStateCacheConfig { + /// Max number of blocks in cache. + /// + /// Default is 5000. + pub max_blocks: u32, + /// Max number receipts in cache. + /// + /// Default is 2000. + pub max_receipts: u32, + /// Max number of bytes for cached env data. + /// + /// Default is 1000. + pub max_envs: u32, +} + +impl Default for EthStateCacheConfig { + fn default() -> Self { + Self { + max_blocks: DEFAULT_BLOCK_CACHE_MAX_LEN, + max_receipts: DEFAULT_RECEIPT_CACHE_MAX_LEN, + max_envs: DEFAULT_ENV_CACHE_MAX_LEN, + } + } +} diff --git a/crates/rpc/rpc/src/eth/cache/metrics.rs b/crates/rpc/rpc/src/eth/cache/metrics.rs new file mode 100644 index 0000000000..eb1f092e80 --- /dev/null +++ b/crates/rpc/rpc/src/eth/cache/metrics.rs @@ -0,0 +1,13 @@ +use reth_metrics::{ + metrics::{self, Gauge}, + Metrics, +}; + +#[derive(Metrics)] +#[metrics(scope = "rpc.eth_cache")] +pub(crate) struct CacheMetrics { + /// The number of entities in the cache. + pub(crate) cached_count: Gauge, + /// The number of queued consumers. + pub(crate) queued_consumers_count: Gauge, +} diff --git a/crates/rpc/rpc/src/eth/cache.rs b/crates/rpc/rpc/src/eth/cache/mod.rs similarity index 81% rename from crates/rpc/rpc/src/eth/cache.rs rename to crates/rpc/rpc/src/eth/cache/mod.rs index 9a8c3a7ee8..f5e758f86b 100644 --- a/crates/rpc/rpc/src/eth/cache.rs +++ b/crates/rpc/rpc/src/eth/cache/mod.rs @@ -2,20 +2,13 @@ use futures::{future::Either, Stream, StreamExt}; use reth_interfaces::{provider::ProviderError, Result}; -use reth_metrics::{ - metrics::{self, Gauge}, - Metrics, -}; use reth_primitives::{Block, Receipt, SealedBlock, TransactionSigned, H256}; use reth_provider::{BlockReader, CanonStateNotification, EvmEnvProvider, StateProviderFactory}; use reth_tasks::{TaskSpawner, TokioTaskExecutor}; use revm::primitives::{BlockEnv, CfgEnv}; -use schnellru::{ByLength, Limiter, LruMap}; -use serde::{Deserialize, Serialize}; +use schnellru::{ByLength, Limiter}; use std::{ - collections::{hash_map::Entry, HashMap}, future::Future, - hash::Hash, pin::Pin, task::{ready, Context, Poll}, }; @@ -25,26 +18,13 @@ use tokio::sync::{ }; use tokio_stream::wrappers::UnboundedReceiverStream; -// TODO: memory based limiter is currently disabled pending -/// Default cache size for the block cache: 500MB -/// -/// With an average block size of ~100kb this should be able to cache ~5000 blocks. -pub const DEFAULT_BLOCK_CACHE_SIZE_BYTES_MB: usize = 500; +mod config; +pub use config::*; -/// Default cache size for the receipts cache: 500MB -pub const DEFAULT_RECEIPT_CACHE_SIZE_BYTES_MB: usize = 500; +mod metrics; -/// Default cache size for the env cache: 1MB -pub const DEFAULT_ENV_CACHE_SIZE_BYTES_MB: usize = 1; - -/// Default cache size for the block cache: 5000 blocks. -pub const DEFAULT_BLOCK_CACHE_MAX_LEN: u32 = 5000; - -/// Default cache size for the receipts cache: 2000 receipts. -pub const DEFAULT_RECEIPT_CACHE_MAX_LEN: u32 = 2000; - -/// Default cache size for the env cache: 1000 envs. -pub const DEFAULT_ENV_CACHE_MAX_LEN: u32 = 1000; +mod multi_consumer; +pub use multi_consumer::MultiConsumerLruCache; /// The type that can send the response to a requested [Block] type BlockResponseSender = oneshot::Sender>>; @@ -69,34 +49,6 @@ type ReceiptsLruCache = MultiConsumerLruCache, L, Receipts type EnvLruCache = MultiConsumerLruCache; -/// Settings for the [EthStateCache] -#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct EthStateCacheConfig { - /// Max number of blocks in cache. - /// - /// Default is 5000. - pub max_blocks: u32, - /// Max number receipts in cache. - /// - /// Default is 2000. - pub max_receipts: u32, - /// Max number of bytes for cached env data. - /// - /// Default is 1000. - pub max_envs: u32, -} - -impl Default for EthStateCacheConfig { - fn default() -> Self { - Self { - max_blocks: DEFAULT_BLOCK_CACHE_MAX_LEN, - max_receipts: DEFAULT_RECEIPT_CACHE_MAX_LEN, - max_envs: DEFAULT_ENV_CACHE_MAX_LEN, - } - } -} - /// Provides async access to cached eth data /// /// This is the frontend for the async caching service which manages cached data on a different @@ -301,7 +253,7 @@ where // cache good block if let Ok(Some(block)) = res { - self.full_block_cache.cache.insert(block_hash, block); + self.full_block_cache.insert(block_hash, block); } } @@ -315,7 +267,7 @@ where // cache good receipts if let Ok(Some(receipts)) = res { - self.receipts_cache.cache.insert(block_hash, receipts); + self.receipts_cache.insert(block_hash, receipts); } } @@ -345,9 +297,7 @@ where match action { CacheAction::GetBlock { block_hash, response_tx } => { // check if block is cached - if let Some(block) = - this.full_block_cache.cache.get(&block_hash).cloned() - { + if let Some(block) = this.full_block_cache.get(&block_hash).cloned() { let _ = response_tx.send(Ok(Some(block))); continue } @@ -365,7 +315,7 @@ where } CacheAction::GetBlockTransactions { block_hash, response_tx } => { // check if block is cached - if let Some(block) = this.full_block_cache.cache.get(&block_hash) { + if let Some(block) = this.full_block_cache.get(&block_hash) { let _ = response_tx.send(Ok(Some(block.body.clone()))); continue } @@ -383,9 +333,7 @@ where } CacheAction::GetReceipts { block_hash, response_tx } => { // check if block is cached - if let Some(receipts) = - this.receipts_cache.cache.get(&block_hash).cloned() - { + if let Some(receipts) = this.receipts_cache.get(&block_hash).cloned() { let _ = response_tx.send(Ok(Some(receipts))); continue } @@ -403,7 +351,7 @@ where } CacheAction::GetEnv { block_hash, response_tx } => { // check if env data is cached - if let Some(env) = this.evm_env_cache.cache.get(&block_hash).cloned() { + if let Some(env) = this.evm_env_cache.get(&block_hash).cloned() { let _ = response_tx.send(Ok(env)); continue } @@ -443,7 +391,7 @@ where // cache good env data if let Ok(data) = res { - this.evm_env_cache.cache.insert(block_hash, data); + this.evm_env_cache.insert(block_hash, data); } } CacheAction::CacheNewCanonicalChain { blocks, receipts } => { @@ -466,72 +414,6 @@ where } } -struct MultiConsumerLruCache -where - K: Hash + Eq, - L: Limiter, -{ - /// The LRU cache for the - cache: LruMap, - /// All queued consumers - queued: HashMap>, - /// Cache metrics - metrics: CacheMetrics, -} - -impl MultiConsumerLruCache -where - K: Hash + Eq, - L: Limiter, -{ - /// Adds the sender to the queue for the given key. - /// - /// Returns true if this is the first queued sender for the key - fn queue(&mut self, key: K, sender: S) -> bool { - self.metrics.queued_consumers_count.increment(1.0); - match self.queued.entry(key) { - Entry::Occupied(mut entry) => { - entry.get_mut().push(sender); - false - } - Entry::Vacant(entry) => { - entry.insert(vec![sender]); - true - } - } - } - - /// Remove consumers for a given key. - fn remove(&mut self, key: &K) -> Option> { - match self.queued.remove(key) { - Some(removed) => { - self.metrics.queued_consumers_count.decrement(removed.len() as f64); - Some(removed) - } - None => None, - } - } - - #[inline] - fn update_cached_metrics(&self) { - self.metrics.cached_count.set(self.cache.len() as f64); - } -} - -impl MultiConsumerLruCache -where - K: Hash + Eq, -{ - /// Creates a new empty map with a given `memory_budget` and metric label. - fn new(max_len: u32, cache_id: &str) -> Self { - Self { - cache: LruMap::new(ByLength::new(max_len)), - queued: Default::default(), - metrics: CacheMetrics::new_with_labels(&[("cache", cache_id.to_string())]), - } - } -} - /// All message variants sent through the channel enum CacheAction { GetBlock { block_hash: H256, response_tx: BlockResponseSender }, @@ -578,12 +460,3 @@ where } } } - -#[derive(Metrics)] -#[metrics(scope = "rpc.eth_cache")] -struct CacheMetrics { - /// The number of entities in the cache. - cached_count: Gauge, - /// The number of queued consumers. - queued_consumers_count: Gauge, -} diff --git a/crates/rpc/rpc/src/eth/cache/multi_consumer.rs b/crates/rpc/rpc/src/eth/cache/multi_consumer.rs new file mode 100644 index 0000000000..32840dc8e3 --- /dev/null +++ b/crates/rpc/rpc/src/eth/cache/multi_consumer.rs @@ -0,0 +1,107 @@ +use super::metrics::CacheMetrics; +use schnellru::{ByLength, Limiter, LruMap}; +use std::{ + collections::{hash_map::Entry, HashMap}, + fmt::{self, Debug, Formatter}, + hash::Hash, +}; + +/// A multi-consumer LRU cache. +pub struct MultiConsumerLruCache +where + K: Hash + Eq, + L: Limiter, +{ + /// The LRU cache for the + cache: LruMap, + /// All queued consumers + queued: HashMap>, + /// Cache metrics + metrics: CacheMetrics, +} + +impl Debug for MultiConsumerLruCache +where + K: Hash + Eq, + L: Limiter, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("MultiConsumerLruCache") + .field("cache_length", &self.cache.len()) + .field("cache_memory_usage", &self.cache.memory_usage()) + .field("queued_length", &self.queued.len()) + .finish() + } +} + +impl MultiConsumerLruCache +where + K: Hash + Eq + Debug, + L: Limiter, +{ + /// Adds the sender to the queue for the given key. + /// + /// Returns true if this is the first queued sender for the key + pub fn queue(&mut self, key: K, sender: S) -> bool { + self.metrics.queued_consumers_count.increment(1.0); + match self.queued.entry(key) { + Entry::Occupied(mut entry) => { + entry.get_mut().push(sender); + false + } + Entry::Vacant(entry) => { + entry.insert(vec![sender]); + true + } + } + } + + /// Remove consumers for a given key. + pub fn remove(&mut self, key: &K) -> Option> { + match self.queued.remove(key) { + Some(removed) => { + self.metrics.queued_consumers_count.decrement(removed.len() as f64); + Some(removed) + } + None => None, + } + } + + /// Returns a reference to the value for a given key and promotes that element to be the most + /// recently used. + pub fn get(&mut self, key: &K) -> Option<&mut V> { + self.cache.get(key) + } + + /// Inserts a new element into the map. + /// + /// Can fail if the element is rejected by the limiter or if we fail to grow an empty map. + /// + /// See [Schnellru::insert](LruMap::insert) for more info. + pub fn insert<'a>(&mut self, key: L::KeyToInsert<'a>, value: V) -> bool + where + L::KeyToInsert<'a>: Hash + PartialEq, + { + self.cache.insert(key, value) + } + + /// Update metrics for the inner cache. + #[inline] + pub fn update_cached_metrics(&self) { + self.metrics.cached_count.set(self.cache.len() as f64); + } +} + +impl MultiConsumerLruCache +where + K: Hash + Eq, +{ + /// Creates a new empty map with a given `max_len` and metric label. + pub fn new(max_len: u32, cache_id: &str) -> Self { + Self { + cache: LruMap::new(ByLength::new(max_len)), + queued: Default::default(), + metrics: CacheMetrics::new_with_labels(&[("cache", cache_id.to_string())]), + } + } +}