mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-29 09:08:05 -05:00
feat(rpc): cache metrics (#3499)
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -5651,6 +5651,7 @@ dependencies = [
|
||||
"pin-project",
|
||||
"rand 0.8.5",
|
||||
"reth-interfaces",
|
||||
"reth-metrics",
|
||||
"reth-network-api",
|
||||
"reth-primitives",
|
||||
"reth-provider",
|
||||
|
||||
@@ -22,6 +22,7 @@ reth-network-api = { workspace = true, features = ["test-utils"] }
|
||||
reth-rpc-engine-api = { path = "../rpc-engine-api" }
|
||||
reth-revm = { path = "../../revm" }
|
||||
reth-tasks = { workspace = true }
|
||||
reth-metrics = { workspace = true }
|
||||
|
||||
# eth
|
||||
revm = { workspace = true, features = [
|
||||
|
||||
@@ -2,6 +2,10 @@
|
||||
|
||||
use futures::{future::Either, Stream, StreamExt};
|
||||
use reth_interfaces::{provider::ProviderError, Result};
|
||||
use reth_metrics::{
|
||||
metrics::{self, Gauge},
|
||||
Metrics,
|
||||
};
|
||||
use reth_primitives::{Block, Receipt, SealedBlock, TransactionSigned, H256};
|
||||
use reth_provider::{BlockReader, CanonStateNotification, EvmEnvProvider, StateProviderFactory};
|
||||
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
|
||||
@@ -104,9 +108,9 @@ impl EthStateCache {
|
||||
let (to_service, rx) = unbounded_channel();
|
||||
let service = EthStateCacheService {
|
||||
provider,
|
||||
full_block_cache: BlockLruCache::with_memory_budget(max_block_bytes),
|
||||
receipts_cache: ReceiptsLruCache::with_memory_budget(max_receipt_bytes),
|
||||
evm_env_cache: EnvLruCache::with_memory_budget(max_env_bytes),
|
||||
full_block_cache: BlockLruCache::new(max_block_bytes, "blocks"),
|
||||
receipts_cache: ReceiptsLruCache::new(max_receipt_bytes, "receipts"),
|
||||
evm_env_cache: EnvLruCache::new(max_env_bytes, "evm_env"),
|
||||
action_tx: to_service.clone(),
|
||||
action_rx: UnboundedReceiverStream::new(rx),
|
||||
action_task_spawner,
|
||||
@@ -274,7 +278,7 @@ where
|
||||
Tasks: TaskSpawner + Clone + 'static,
|
||||
{
|
||||
fn on_new_block(&mut self, block_hash: H256, res: Result<Option<Block>>) {
|
||||
if let Some(queued) = self.full_block_cache.queued.remove(&block_hash) {
|
||||
if let Some(queued) = self.full_block_cache.remove(&block_hash) {
|
||||
// send the response to queued senders
|
||||
for tx in queued {
|
||||
match tx {
|
||||
@@ -297,7 +301,7 @@ where
|
||||
}
|
||||
|
||||
fn on_new_receipts(&mut self, block_hash: H256, res: Result<Option<Vec<Receipt>>>) {
|
||||
if let Some(queued) = self.receipts_cache.queued.remove(&block_hash) {
|
||||
if let Some(queued) = self.receipts_cache.remove(&block_hash) {
|
||||
// send the response to queued senders
|
||||
for tx in queued {
|
||||
let _ = tx.send(res.clone());
|
||||
@@ -309,6 +313,12 @@ where
|
||||
self.receipts_cache.cache.insert(block_hash, receipts);
|
||||
}
|
||||
}
|
||||
|
||||
fn update_cached_metrics(&self) {
|
||||
self.full_block_cache.update_cached_count();
|
||||
self.receipts_cache.update_cached_count();
|
||||
self.evm_env_cache.update_cached_count();
|
||||
}
|
||||
}
|
||||
|
||||
impl<Provider, Tasks> Future for EthStateCacheService<Provider, Tasks>
|
||||
@@ -419,7 +429,7 @@ where
|
||||
}
|
||||
CacheAction::EnvResult { block_hash, res } => {
|
||||
let res = *res;
|
||||
if let Some(queued) = this.evm_env_cache.queued.remove(&block_hash) {
|
||||
if let Some(queued) = this.evm_env_cache.remove(&block_hash) {
|
||||
// send the response to queued senders
|
||||
for tx in queued {
|
||||
let _ = tx.send(res.clone());
|
||||
@@ -443,7 +453,8 @@ where
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
this.update_cached_metrics();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -459,6 +470,8 @@ where
|
||||
cache: LruMap<K, V, L>,
|
||||
/// All queued consumers
|
||||
queued: HashMap<K, Vec<S>>,
|
||||
/// Cache metrics
|
||||
metrics: CacheMetrics,
|
||||
}
|
||||
|
||||
impl<K, V, L, S> MultiConsumerLruCache<K, V, L, S>
|
||||
@@ -470,6 +483,7 @@ where
|
||||
///
|
||||
/// Returns true if this is the first queued sender for the key
|
||||
fn queue(&mut self, key: K, sender: S) -> bool {
|
||||
self.metrics.queued_consumers_count.increment(1.0);
|
||||
match self.queued.entry(key) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
entry.get_mut().push(sender);
|
||||
@@ -481,17 +495,36 @@ where
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove consumers for a given key.
|
||||
fn remove(&mut self, key: &K) -> Option<Vec<S>> {
|
||||
match self.queued.remove(key) {
|
||||
Some(removed) => {
|
||||
self.metrics.queued_consumers_count.decrement(removed.len() as f64);
|
||||
Some(removed)
|
||||
}
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn update_cached_count(&self) {
|
||||
self.metrics.cached_count.set(self.cache.len() as f64);
|
||||
}
|
||||
}
|
||||
|
||||
impl<K, V, S> MultiConsumerLruCache<K, V, ByMemoryUsage, S>
|
||||
where
|
||||
K: Hash + Eq,
|
||||
{
|
||||
/// Creates a new empty map with a given `memory_budget`.
|
||||
/// Creates a new empty map with a given `memory_budget` and metric label.
|
||||
///
|
||||
/// See also [LruMap::with_memory_budget]
|
||||
fn with_memory_budget(memory_budget: usize) -> Self {
|
||||
Self { cache: LruMap::with_memory_budget(memory_budget), queued: Default::default() }
|
||||
fn new(memory_budget: usize, cache_id: &str) -> Self {
|
||||
Self {
|
||||
cache: LruMap::with_memory_budget(memory_budget),
|
||||
queued: Default::default(),
|
||||
metrics: CacheMetrics::new_with_labels(&[("cache", cache_id.to_string())]),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -541,3 +574,12 @@ where
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Metrics)]
|
||||
#[metrics(scope = "rpc.eth_cache")]
|
||||
struct CacheMetrics {
|
||||
/// The number of entities in the cache.
|
||||
cached_count: Gauge,
|
||||
/// The number of queued consumers.
|
||||
queued_consumers_count: Gauge,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user