diff --git a/Cargo.lock b/Cargo.lock index 3e4dd33a52..dc6fb0b656 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5651,6 +5651,7 @@ dependencies = [ "pin-project", "rand 0.8.5", "reth-interfaces", + "reth-metrics", "reth-network-api", "reth-primitives", "reth-provider", diff --git a/crates/rpc/rpc/Cargo.toml b/crates/rpc/rpc/Cargo.toml index 796c5ded0e..2b3d1631ab 100644 --- a/crates/rpc/rpc/Cargo.toml +++ b/crates/rpc/rpc/Cargo.toml @@ -22,6 +22,7 @@ reth-network-api = { workspace = true, features = ["test-utils"] } reth-rpc-engine-api = { path = "../rpc-engine-api" } reth-revm = { path = "../../revm" } reth-tasks = { workspace = true } +reth-metrics = { workspace = true } # eth revm = { workspace = true, features = [ diff --git a/crates/rpc/rpc/src/eth/cache.rs b/crates/rpc/rpc/src/eth/cache.rs index 5e84fcd693..af0858cb4b 100644 --- a/crates/rpc/rpc/src/eth/cache.rs +++ b/crates/rpc/rpc/src/eth/cache.rs @@ -2,6 +2,10 @@ use futures::{future::Either, Stream, StreamExt}; use reth_interfaces::{provider::ProviderError, Result}; +use reth_metrics::{ + metrics::{self, Gauge}, + Metrics, +}; use reth_primitives::{Block, Receipt, SealedBlock, TransactionSigned, H256}; use reth_provider::{BlockReader, CanonStateNotification, EvmEnvProvider, StateProviderFactory}; use reth_tasks::{TaskSpawner, TokioTaskExecutor}; @@ -104,9 +108,9 @@ impl EthStateCache { let (to_service, rx) = unbounded_channel(); let service = EthStateCacheService { provider, - full_block_cache: BlockLruCache::with_memory_budget(max_block_bytes), - receipts_cache: ReceiptsLruCache::with_memory_budget(max_receipt_bytes), - evm_env_cache: EnvLruCache::with_memory_budget(max_env_bytes), + full_block_cache: BlockLruCache::new(max_block_bytes, "blocks"), + receipts_cache: ReceiptsLruCache::new(max_receipt_bytes, "receipts"), + evm_env_cache: EnvLruCache::new(max_env_bytes, "evm_env"), action_tx: to_service.clone(), action_rx: UnboundedReceiverStream::new(rx), action_task_spawner, @@ -274,7 +278,7 @@ where Tasks: TaskSpawner + Clone + 'static, { fn on_new_block(&mut self, block_hash: H256, res: Result>) { - if let Some(queued) = self.full_block_cache.queued.remove(&block_hash) { + if let Some(queued) = self.full_block_cache.remove(&block_hash) { // send the response to queued senders for tx in queued { match tx { @@ -297,7 +301,7 @@ where } fn on_new_receipts(&mut self, block_hash: H256, res: Result>>) { - if let Some(queued) = self.receipts_cache.queued.remove(&block_hash) { + if let Some(queued) = self.receipts_cache.remove(&block_hash) { // send the response to queued senders for tx in queued { let _ = tx.send(res.clone()); @@ -309,6 +313,12 @@ where self.receipts_cache.cache.insert(block_hash, receipts); } } + + fn update_cached_metrics(&self) { + self.full_block_cache.update_cached_count(); + self.receipts_cache.update_cached_count(); + self.evm_env_cache.update_cached_count(); + } } impl Future for EthStateCacheService @@ -419,7 +429,7 @@ where } CacheAction::EnvResult { block_hash, res } => { let res = *res; - if let Some(queued) = this.evm_env_cache.queued.remove(&block_hash) { + if let Some(queued) = this.evm_env_cache.remove(&block_hash) { // send the response to queued senders for tx in queued { let _ = tx.send(res.clone()); @@ -443,7 +453,8 @@ where ); } } - } + }; + this.update_cached_metrics(); } } } @@ -459,6 +470,8 @@ where cache: LruMap, /// All queued consumers queued: HashMap>, + /// Cache metrics + metrics: CacheMetrics, } impl MultiConsumerLruCache @@ -470,6 +483,7 @@ where /// /// Returns true if this is the first queued sender for the key fn queue(&mut self, key: K, sender: S) -> bool { + self.metrics.queued_consumers_count.increment(1.0); match self.queued.entry(key) { Entry::Occupied(mut entry) => { entry.get_mut().push(sender); @@ -481,17 +495,36 @@ where } } } + + /// Remove consumers for a given key. + fn remove(&mut self, key: &K) -> Option> { + match self.queued.remove(key) { + Some(removed) => { + self.metrics.queued_consumers_count.decrement(removed.len() as f64); + Some(removed) + } + None => None, + } + } + + fn update_cached_count(&self) { + self.metrics.cached_count.set(self.cache.len() as f64); + } } impl MultiConsumerLruCache where K: Hash + Eq, { - /// Creates a new empty map with a given `memory_budget`. + /// Creates a new empty map with a given `memory_budget` and metric label. /// /// See also [LruMap::with_memory_budget] - fn with_memory_budget(memory_budget: usize) -> Self { - Self { cache: LruMap::with_memory_budget(memory_budget), queued: Default::default() } + fn new(memory_budget: usize, cache_id: &str) -> Self { + Self { + cache: LruMap::with_memory_budget(memory_budget), + queued: Default::default(), + metrics: CacheMetrics::new_with_labels(&[("cache", cache_id.to_string())]), + } } } @@ -541,3 +574,12 @@ where } } } + +#[derive(Metrics)] +#[metrics(scope = "rpc.eth_cache")] +struct CacheMetrics { + /// The number of entities in the cache. + cached_count: Gauge, + /// The number of queued consumers. + queued_consumers_count: Gauge, +}